ARTEMIS-792 Add additional tests for AMQP protocol

Adds several tests for AMQP expectations in various use cases.
This commit is contained in:
Timothy Bish 2016-10-11 12:16:28 -04:00
parent 0a1e6bdd5e
commit 7a8b7e9cfb
6 changed files with 823 additions and 39 deletions

View File

@ -21,8 +21,12 @@ import java.net.URI;
import java.util.LinkedList; import java.util.LinkedList;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpConnection;
@ -35,9 +39,11 @@ import org.junit.Before;
*/ */
public class AmqpClientTestSupport extends ActiveMQTestBase { public class AmqpClientTestSupport extends ActiveMQTestBase {
ActiveMQServer server; private boolean useSSL;
LinkedList<AmqpConnection> connections = new LinkedList<>(); protected JMSServerManager serverManager;
protected ActiveMQServer server;
protected LinkedList<AmqpConnection> connections = new LinkedList<>();
protected AmqpConnection addConnection(AmqpConnection connection) { protected AmqpConnection addConnection(AmqpConnection connection) {
connections.add(connection); connections.add(connection);
@ -48,9 +54,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
server = createServer();
server = createServer(true, true);
server.start();
} }
@After @After
@ -63,18 +67,36 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
ignored.printStackTrace(); ignored.printStackTrace();
} }
} }
if (serverManager != null) {
try {
serverManager.stop();
} catch (Throwable ignored) {
ignored.printStackTrace();
}
serverManager = null;
}
server.stop(); server.stop();
super.tearDown(); super.tearDown();
} }
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = createServer(true, true);
serverManager = new JMSServerManagerImpl(server);
Configuration serverConfig = server.getConfiguration();
serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(true).setDeadLetterAddress(new SimpleString("jms.queue.ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(false);
serverManager.start();
server.start();
return server;
}
public Queue getProxyToQueue(String queueName) { public Queue getProxyToQueue(String queueName) {
return server.locateQueue(SimpleString.toSimpleString(queueName)); return server.locateQueue(SimpleString.toSimpleString(queueName));
} }
private String connectorScheme = "amqp";
private boolean useSSL;
public String getTestName() { public String getTestName() {
return "jms.queue." + getName(); return "jms.queue." + getName();
} }
@ -83,14 +105,9 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
} }
public AmqpClientTestSupport(String connectorScheme, boolean useSSL) { public AmqpClientTestSupport(String connectorScheme, boolean useSSL) {
this.connectorScheme = connectorScheme;
this.useSSL = useSSL; this.useSSL = useSSL;
} }
public String getConnectorScheme() {
return connectorScheme;
}
public boolean isUseSSL() { public boolean isUseSSL() {
return useSSL; return useSSL;
} }
@ -99,30 +116,6 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
return ""; return "";
} }
protected boolean isUseTcpConnector() {
return !isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("ws");
}
protected boolean isUseSslConnector() {
return isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("wss");
}
protected boolean isUseNioConnector() {
return !isUseSSL() && connectorScheme.contains("nio");
}
protected boolean isUseNioPlusSslConnector() {
return isUseSSL() && connectorScheme.contains("nio");
}
protected boolean isUseWsConnector() {
return !isUseSSL() && connectorScheme.contains("ws");
}
protected boolean isUseWssConnector() {
return isUseSSL() && connectorScheme.contains("wss");
}
public URI getBrokerAmqpConnectionURI() { public URI getBrokerAmqpConnectionURI() {
boolean webSocket = false; boolean webSocket = false;

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
/**
* Test around the handling of Deliver Annotations in messages sent and received.
*/
public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport {
private final String DELIVERY_ANNOTATION_NAME = "TEST-DELIVERY-ANNOTATION";
@Test(timeout = 60000)
public void testDeliveryAnnotationsStrippedFromIncoming() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getTestName());
sender.send(message);
receiver.flow(1);
Queue queue = getProxyToQueue(getTestName());
assertEquals(1, queue.getMessageCount());
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
assertNull(received.getDeliveryAnnotation(DELIVERY_ANNOTATION_NAME));
sender.close();
connection.close();
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpNoLocalFilter;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
/**
* Test that the broker can pass through an AMQP message with a described type in the message
* body regardless of transformer in use.
*/
public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendMessageWithDescribedTypeInBody() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpMessage message = new AmqpMessage();
message.setDescribedType(new AmqpNoLocalFilter());
sender.send(message);
sender.close();
Queue queue = getProxyToQueue(getTestName());
assertEquals(1, queue.getMessageCount());
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
assertNotNull(received.getDescribedType());
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageWithDescribedTypeInBodyReceiveOverOpenWire() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpMessage message = new AmqpMessage();
message.setDescribedType(new AmqpNoLocalFilter());
sender.send(message);
sender.close();
connection.close();
Queue queue = getProxyToQueue(getTestName());
assertEquals(1, queue.getMessageCount());
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection jmsConnection = factory.createConnection();
try {
Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = jmsSession.createQueue(getName());
MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
jmsConnection.start();
Message received = jmsConsumer.receive(5000);
assertNotNull(received);
assertTrue(received instanceof BytesMessage);
} finally {
jmsConnection.close();
}
}
@Test(timeout = 60000)
public void testDescribedTypeMessageRoundTrips() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
// Send with AMQP client.
AmqpSender sender = session.createSender(getTestName());
AmqpMessage message = new AmqpMessage();
message.setDescribedType(new AmqpNoLocalFilter());
sender.send(message);
sender.close();
Queue queue = getProxyToQueue(getTestName());
assertEquals(1, queue.getMessageCount());
// Receive and resend with OpenWire JMS client
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection jmsConnection = factory.createConnection();
try {
Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = jmsSession.createQueue(getName());
MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
jmsConnection.start();
Message received = jmsConsumer.receive(5000);
assertNotNull(received);
assertTrue(received instanceof BytesMessage);
MessageProducer jmsProducer = jmsSession.createProducer(destination);
jmsProducer.send(received);
} finally {
jmsConnection.close();
}
assertEquals(1, queue.getMessageCount());
// Now lets receive it with AMQP and see that we get back what we expected.
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage returned = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(returned);
assertNotNull(returned.getDescribedType());
receiver.close();
connection.close();
}
}

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
/**
* Tests various behaviors of broker side drain support.
*/
public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testReceiverCanDrainMessages() throws Exception {
int MSG_COUNT = 20;
sendMessages(getTestName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
Queue queueView = getProxyToQueue(getTestName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver.drain(MSG_COUNT);
for (int i = 0; i < MSG_COUNT; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
}
receiver.close();
assertEquals(0, queueView.getMessageCount());
connection.close();
}
@Test(timeout = 60000)
public void testPullWithNoMessageGetDrained() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(10);
Queue queueView = getProxyToQueue(getTestName());
assertEquals(0, queueView.getMessageCount());
assertEquals(0, queueView.getDeliveringCount());
assertEquals(10, receiver.getReceiver().getRemoteCredit());
assertNull(receiver.pull(1, TimeUnit.SECONDS));
assertEquals(0, receiver.getReceiver().getRemoteCredit());
connection.close();
}
@Test(timeout = 60000)
public void testPullOneFromRemote() throws Exception {
int MSG_COUNT = 20;
sendMessages(getTestName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
Queue queueView = getProxyToQueue(getTestName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
assertEquals(0, receiver.getReceiver().getRemoteCredit());
AmqpMessage message = receiver.pull(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
assertEquals(0, receiver.getReceiver().getRemoteCredit());
receiver.close();
assertEquals(MSG_COUNT - 1, queueView.getMessageCount());
assertEquals(1, queueView.getMessagesAcknowledged());
connection.close();
}
@Test(timeout = 60000)
public void testMultipleZeroResultPulls() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(10);
Queue queueView = getProxyToQueue(getTestName());
assertEquals(0, queueView.getMessageCount());
assertEquals(10, receiver.getReceiver().getRemoteCredit());
assertNull(receiver.pull(1, TimeUnit.SECONDS));
assertEquals(0, receiver.getReceiver().getRemoteCredit());
assertNull(receiver.pull(1, TimeUnit.SECONDS));
assertNull(receiver.pull(1, TimeUnit.SECONDS));
assertEquals(0, receiver.getReceiver().getRemoteCredit());
connection.close();
}
public void sendMessages(String destinationName, int count) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = null;
try {
connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(destinationName);
for (int i = 0; i < count; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message-" + i);
sender.send(message);
}
sender.close();
} finally {
if (connection != null) {
connection.close();
}
}
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
/**
* Test for scheduled message support using AMQP message annotations.
*/
public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendWithDeliveryTimeIsScheduled() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2);
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
sender.close();
assertEquals(1, queueView.getScheduledCount());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNull(received);
connection.close();
}
@Test(timeout = 60000)
public void testSendRecvWithDeliveryTime() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + 6000;
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
sender.close();
assertEquals(1, queueView.getScheduledCount());
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
// Now try and get the message, should not due to being scheduled.
AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
assertNull(received);
// Now try and get the message, should get it now
received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
connection.close();
}
@Test
public void testScheduleWithDelay() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
long delay = 6000;
message.setMessageAnnotation("x-opt-delivery-delay", delay);
message.setText("Test-Message");
sender.send(message);
sender.close();
assertEquals(1, queueView.getScheduledCount());
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
// Now try and get the message, should not due to being scheduled.
AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
assertNull(received);
// Now try and get the message, should get it now
received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
connection.close();
}
}

View File

@ -16,22 +16,31 @@
*/ */
package org.apache.activemq.artemis.tests.integration.amqp; package org.apache.activemq.artemis.tests.integration.amqp;
import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -43,6 +52,255 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class); protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
@Test(timeout = 60000)
public void testCreateQueueReceiver() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
Queue queue = getProxyToQueue(getTestName());
assertNotNull(queue);
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testCreateQueueReceiverWithJMSSelector() throws Exception {
AmqpClient client = createAmqpClient();
client.setValidator(new AmqpValidator() {
@SuppressWarnings("unchecked")
@Override
public void inspectOpenedResource(Receiver receiver) {
if (receiver.getRemoteSource() == null) {
markAsInvalid("Link opened with null source.");
}
Source source = (Source) receiver.getRemoteSource();
Map<Symbol, Object> filters = source.getFilter();
if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) {
markAsInvalid("Broker did not return the JMS Filter on Attach");
}
}
});
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
session.createReceiver(getTestName(), "JMSPriority > 8");
connection.getStateInspector().assertValid();
connection.close();
}
@Test(timeout = 60000)
public void testCreateQueueReceiverWithNoLocalSet() throws Exception {
AmqpClient client = createAmqpClient();
client.setValidator(new AmqpValidator() {
@SuppressWarnings("unchecked")
@Override
public void inspectOpenedResource(Receiver receiver) {
if (receiver.getRemoteSource() == null) {
markAsInvalid("Link opened with null source.");
}
Source source = (Source) receiver.getRemoteSource();
Map<Symbol, Object> filters = source.getFilter();
// Currently don't support noLocal on a Queue
if (findFilter(filters, NO_LOCAL_FILTER_IDS) != null) {
markAsInvalid("Broker did not return the NoLocal Filter on Attach");
}
}
});
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
session.createReceiver(getTestName(), null, true);
connection.getStateInspector().assertValid();
connection.close();
}
@Test(timeout = 60000)
public void testQueueReceiverReadMessage() throws Exception {
sendMessages(getTestName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
Queue queueView = getProxyToQueue(getTestName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
receiver.close();
assertEquals(1, queueView.getMessageCount());
connection.close();
}
@Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
int MSG_COUNT = 4;
sendMessages(getTestName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
Queue queueView = getProxyToQueue(getTestName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(2);
assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
AmqpReceiver receiver2 = session.createReceiver(getTestName());
assertEquals(2, server.getTotalConsumerCount());
receiver2.flow(2);
assertNotNull(receiver2.receive(5, TimeUnit.SECONDS));
assertNotNull(receiver2.receive(5, TimeUnit.SECONDS));
assertEquals(0, queueView.getMessagesAcknowledged());
receiver1.close();
receiver2.close();
assertEquals(MSG_COUNT, queueView.getMessageCount());
connection.close();
}
@Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception {
int MSG_COUNT = 4;
sendMessages(getTestName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
final Queue queueView = getProxyToQueue(getTestName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(2);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
message = receiver1.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return queueView.getMessagesAcknowledged() == 2;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
AmqpReceiver receiver2 = session.createReceiver(getTestName());
assertEquals(2, server.getTotalConsumerCount());
receiver2.flow(2);
message = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
message = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
assertTrue("Queue should be empty now", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return queueView.getMessagesAcknowledged() == 4;
}
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(10)));
receiver1.close();
receiver2.close();
assertEquals(0, queueView.getMessageCount());
connection.close();
}
@Test(timeout = 60000)
public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception {
int MSG_COUNT = 20;
sendMessages(getTestName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
final Queue queueView = getProxyToQueue(getTestName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(20);
assertTrue("Should have dispatch to prefetch", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return queueView.getDeliveringCount() >= 2;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
receiver1.close();
AmqpReceiver receiver2 = session.createReceiver(getTestName());
assertEquals(1, server.getTotalConsumerCount());
receiver2.flow(MSG_COUNT * 2);
AmqpMessage message = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
message = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return queueView.getMessagesAcknowledged() == 2;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
receiver2.close();
assertEquals(MSG_COUNT - 2, queueView.getMessageCount());
connection.close();
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testSimpleSendOneReceiveOne() throws Exception { public void testSimpleSendOneReceiveOne() throws Exception {
@ -476,7 +734,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
assertTrue("Should be no inflight messages: " + destinationView.getDeliveringCount(), Wait.waitFor(new Wait.Condition() { assertTrue("Should be no inflight messages: " + destinationView.getDeliveringCount(), Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisfied() throws Exception {
return destinationView.getDeliveringCount() == 0; return destinationView.getDeliveringCount() == 0;
} }
})); }));
@ -554,4 +812,21 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
public void sendMessages(String destinationName, int count) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(destinationName);
for (int i = 0; i < count; ++i) {
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:" + i);
sender.send(message);
}
} finally {
connection.close();
}
}
} }