NO-JIRA: Added a AMQP JMS LargeMessage test

This commit is contained in:
Howard Gao 2018-04-27 20:22:28 +08:00 committed by Clebert Suconic
parent d3c83898f5
commit 8a73fdd3a2
1 changed files with 65 additions and 0 deletions
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
@ -29,10 +30,12 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Test;
@ -221,4 +224,66 @@ public class JMSDurableConsumerTest extends JMSClientTestSupport {
connection.close();
}
}
@Test(timeout = 30000)
public void testDurableConsumerLarge() throws Exception {
String durableClientId = getTopicName() + "-ClientId";
Connection connection = createConnection(durableClientId);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTopicName());
final MessageConsumer consumer1 = session.createDurableSubscriber(topic, "DurbaleSub1");
final MessageConsumer consumer2 = session.createDurableSubscriber(topic, "DurbaleSub2");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
ObjectMessage objMessage = session.createObjectMessage();
BigObject bigObject = new BigObject(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
objMessage.setObject(bigObject);
producer.send(objMessage);
final AtomicReference<Message> msg1 = new AtomicReference<>();
final AtomicReference<Message> msg2 = new AtomicReference<>();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
msg1.set(consumer1.receiveNoWait());
return msg1.get() != null;
}
}, TimeUnit.SECONDS.toMillis(25), TimeUnit.MILLISECONDS.toMillis(200)));
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
msg2.set(consumer2.receiveNoWait());
return msg2.get() != null;
}
}, TimeUnit.SECONDS.toMillis(25), TimeUnit.MILLISECONDS.toMillis(200)));
assertNotNull("Should have received a message by now.", msg1.get());
assertTrue("Should be an instance of TextMessage", msg1.get() instanceof ObjectMessage);
assertNotNull("Should have received a message by now.", msg2.get());
assertTrue("Should be an instance of TextMessage", msg2.get() instanceof ObjectMessage);
} finally {
connection.close();
}
}
public static class BigObject implements Serializable {
private char[] contents;
public BigObject(int size) {
contents = new char[size];
for (int i = 0; i < size; i++) {
contents[i] = 'X';
}
}
}
}