This commit is contained in:
Clebert Suconic 2018-11-14 10:22:46 -05:00
commit 256e7c8aae
3 changed files with 264 additions and 132 deletions

View File

@ -23,7 +23,6 @@ import org.junit.Before;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
@ -75,7 +74,7 @@ public class CliProducerTest extends CliTestBase {
private void checkSentMessages(Session session, String address, String messageBody) throws Exception {
final boolean isCustomMessageBody = messageBody != null;
boolean fqqn = false;
if (address.startsWith("fqqn://")) fqqn = true;
if (address.contains("::")) fqqn = true;
List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT, fqqn);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
@ -84,17 +83,10 @@ public class CliProducerTest extends CliTestBase {
}
}
private Session createSession() throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
return session;
}
@Test
public void testSendMessage() throws Exception {
String address = "test";
Session session = createSession();
Session session = createSession(connection);
produceMessages(address, TEST_MESSAGE_COUNT);
@ -108,7 +100,7 @@ public class CliProducerTest extends CliTestBase {
String fqqn = address + "::" + queue;
createQueue("--multicast", address, queue);
Session session = createSession();
Session session = createSession(connection);
produceMessages("topic://" + address, TEST_MESSAGE_COUNT);
@ -123,7 +115,7 @@ public class CliProducerTest extends CliTestBase {
String messageBody = new StringGenerator().generateRandomString(20);
createQueue("--multicast", address, queue);
Session session = createSession();
Session session = createSession(connection);
produceMessages("topic://" + address, messageBody, TEST_MESSAGE_COUNT);
@ -135,7 +127,7 @@ public class CliProducerTest extends CliTestBase {
String address = "test";
String messageBody = new StringGenerator().generateRandomString(20);
Session session = createSession();
Session session = createSession(connection);
produceMessages(address, messageBody, TEST_MESSAGE_COUNT);
@ -147,7 +139,7 @@ public class CliProducerTest extends CliTestBase {
String address = "test";
String messageBody = new StringGenerator().generateRandomString(500000);
Session session = createSession();
Session session = createSession(connection);
produceMessages(address, messageBody, TEST_MESSAGE_COUNT);

View File

@ -32,6 +32,7 @@ import org.junit.rules.TemporaryFolder;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
@ -91,11 +92,11 @@ public class CliTestBase {
Artemis.internalExecute("run");
}
protected void setupAuth() throws Exception {
void setupAuth() {
setupAuth(temporaryFolder.getRoot());
}
protected void setupAuth(File folder) throws Exception {
void setupAuth(File folder) {
System.setProperty("java.security.auth.login.config", folder.getAbsolutePath() + "/etc/login.config");
}
@ -105,7 +106,7 @@ public class CliTestBase {
assertEquals(0, LibaioContext.getTotalMaxIO());
}
protected ActiveMQConnectionFactory getConnectionFactory(int serverPort) throws Exception {
protected ActiveMQConnectionFactory getConnectionFactory(int serverPort) {
return new ActiveMQConnectionFactory("tcp://localhost:" + String.valueOf(serverPort));
}
@ -121,7 +122,7 @@ public class CliTestBase {
"--auto-create-address");
}
protected void closeConnection(ActiveMQConnectionFactory cf, Connection connection) throws Exception {
void closeConnection(ActiveMQConnectionFactory cf, Connection connection) throws Exception {
try {
connection.close();
cf.close();
@ -130,6 +131,12 @@ public class CliTestBase {
}
}
protected Session createSession(Connection connection) throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
return session;
}
protected List<Message> consumeMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception {
Destination destination = fqqn ? session.createQueue(address) : getDestination(address);
MessageConsumer consumer = session.createConsumer(destination);
@ -143,8 +150,12 @@ public class CliTestBase {
return messages;
}
protected Destination getDestination(String queueName) {
Destination getDestination(String queueName) {
return ActiveMQDestination.createDestination("queue://" + queueName, ActiveMQDestination.TYPE.QUEUE);
}
Destination getTopicDestination(String queueName) {
return ActiveMQDestination.createDestination("topic://" + queueName, ActiveMQDestination.TYPE.TOPIC);
}
}

View File

@ -26,6 +26,7 @@ import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
@ -53,6 +54,7 @@ public class MessageSerializerTest extends CliTestBase {
private Connection connection;
private ActiveMQConnectionFactory cf;
private static final int TEST_MESSAGE_COUNT = 10;
@Before
@Override
@ -60,7 +62,7 @@ public class MessageSerializerTest extends CliTestBase {
setupAuth();
super.setup();
startServer();
cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
cf = getConnectionFactory(61616);
connection = cf.createConnection("admin", "admin");
}
@ -75,95 +77,35 @@ public class MessageSerializerTest extends CliTestBase {
return temporaryFolder.newFile("messages.xml");
}
@Test
public void testTextMessageImportExport() throws Exception {
String address = "test";
int noMessages = 10;
File file = createMessageFile();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
List<Message> sent = new ArrayList<>(noMessages);
for (int i = 0; i < noMessages; i++) {
sent.add(session.createTextMessage(RandomUtil.randomString()));
private List<Message> generateTextMessages(Session session, String address) throws Exception {
List<Message> messages = new ArrayList<>(TEST_MESSAGE_COUNT);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
messages.add(session.createTextMessage(RandomUtil.randomString()));
}
sendMessages(session, address, sent);
exportMessages(address, noMessages, file);
sendMessages(session, address, messages);
// Ensure there's nothing left to consume
MessageConsumer consumer = session.createConsumer(getDestination(address));
assertNull(consumer.receive(1000));
consumer.close();
importMessages(address, file);
List<Message> received = consumeMessages(session, address, noMessages, false);
for (int i = 0; i < noMessages; i++) {
assertEquals(((TextMessage) sent.get(i)).getText(), ((TextMessage) received.get(i)).getText());
}
return messages;
}
@Test
public void testObjectMessageImportExport() throws Exception {
String address = "test";
int noMessages = 10;
File file = createMessageFile();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Send initial messages.
List<Message> sent = new ArrayList<>(noMessages);
for (int i = 0; i < noMessages; i++) {
sent.add(session.createObjectMessage(UUID.randomUUID()));
private List<Message> generateTextMessages(Session session, Destination destination) throws Exception {
List<Message> messages = new ArrayList<>(TEST_MESSAGE_COUNT);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
messages.add(session.createTextMessage(RandomUtil.randomString()));
}
sendMessages(session, address, sent);
exportMessages(address, noMessages, file);
sendMessages(session, destination, messages);
// Ensure there's nothing left to consume
MessageConsumer consumer = session.createConsumer(getDestination(address));
assertNull(consumer.receive(1000));
consumer.close();
importMessages(address, file);
List<Message> received = consumeMessages(session, address, noMessages, false);
for (int i = 0; i < noMessages; i++) {
assertEquals(((ObjectMessage) sent.get(i)).getObject(), ((ObjectMessage) received.get(i)).getObject());
}
return messages;
}
@Test
public void testMapMessageImportExport() throws Exception {
String address = "test";
int noMessages = 10;
String key = "testKey";
File file = createMessageFile();
private void checkSentMessages(Session session, List<Message> messages, String address) throws Exception {
boolean fqqn = false;
if (address.contains("::")) fqqn = true;
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
List<Message> sent = new ArrayList<>(noMessages);
for (int i = 0; i < noMessages; i++) {
MapMessage m = session.createMapMessage();
m.setString(key, RandomUtil.randomString());
sent.add(m);
}
sendMessages(session, address, sent);
exportMessages(address, noMessages, file);
// Ensure there's nothing left to consume
MessageConsumer consumer = session.createConsumer(getDestination(address));
assertNull(consumer.receive(1000));
consumer.close();
importMessages(address, file);
List<Message> received = consumeMessages(session, address, noMessages, false);
for (int i = 0; i < noMessages; i++) {
assertEquals(((MapMessage) sent.get(i)).getString(key), ((MapMessage) received.get(i)).getString(key));
List<Message> recieved = consumeMessages(session, address, TEST_MESSAGE_COUNT, fqqn);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
assertEquals(((TextMessage) messages.get(i)).getText(), ((TextMessage) recieved.get(i)).getText());
}
}
@ -181,21 +123,126 @@ public class MessageSerializerTest extends CliTestBase {
}
}
private void exportMessages(String address, int noMessages, File output) throws Exception {
Artemis.main("consumer",
"--user", "admin",
"--password", "admin",
"--destination", address,
"--message-count", "" + noMessages,
"--data", output.getAbsolutePath());
private void exportMessages(String address, File output) throws Exception {
exportMessages(address, TEST_MESSAGE_COUNT, false, "test-client", output);
}
private void exportMessages(String address, int noMessages, boolean durable, String clientId, File output) throws Exception {
if (durable) {
String[] args = {"consumer",
"--user", "admin",
"--password", "admin",
"--destination", address,
"--message-count", Integer.toString(noMessages),
"--data", output.getAbsolutePath(),
"--clientID", clientId,
"--durable"};
Artemis.main(args);
} else {
String[] args = {"consumer",
"--user", "admin",
"--password", "admin",
"--destination", address,
"--message-count", Integer.toString(noMessages),
"--data", output.getAbsolutePath(),
"--clientID", clientId};
Artemis.main(args);
}
}
private void importMessages(String address, File input) throws Exception {
Artemis.main("producer",
"--user", "admin",
"--password", "admin",
"--destination", address,
"--data", input.getAbsolutePath());
"--user", "admin",
"--password", "admin",
"--destination", address,
"--data", input.getAbsolutePath());
}
private void createBothTypeAddress(String address) throws Exception {
Artemis.main("address", "create",
"--user", "admin",
"--password", "admin",
"--name", address,
"--anycast", "--multicast");
}
@Test
public void testTextMessageImportExport() throws Exception {
String address = "test";
File file = createMessageFile();
Session session = createSession(connection);
List<Message> messages = generateTextMessages(session, address);
exportMessages(address, file);
// Ensure there's nothing left to consume
MessageConsumer consumer = session.createConsumer(getDestination(address));
assertNull(consumer.receive(1000));
consumer.close();
importMessages(address, file);
checkSentMessages(session, messages, address);
}
@Test
public void testObjectMessageImportExport() throws Exception {
String address = "test";
File file = createMessageFile();
Session session = createSession(connection);
// Send initial messages.
List<Message> sent = new ArrayList<>(TEST_MESSAGE_COUNT);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
sent.add(session.createObjectMessage(UUID.randomUUID()));
}
sendMessages(session, address, sent);
exportMessages(address, file);
// Ensure there's nothing left to consume
MessageConsumer consumer = session.createConsumer(getDestination(address));
assertNull(consumer.receive(1000));
consumer.close();
importMessages(address, file);
List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT, false);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
assertEquals(((ObjectMessage) sent.get(i)).getObject(), ((ObjectMessage) received.get(i)).getObject());
}
}
@Test
public void testMapMessageImportExport() throws Exception {
String address = "test";
String key = "testKey";
File file = createMessageFile();
Session session = createSession(connection);
List<Message> sent = new ArrayList<>(TEST_MESSAGE_COUNT);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
MapMessage m = session.createMapMessage();
m.setString(key, RandomUtil.randomString());
sent.add(m);
}
sendMessages(session, address, sent);
exportMessages(address, file);
// Ensure there's nothing left to consume
MessageConsumer consumer = session.createConsumer(getDestination(address));
assertNull(consumer.receive(1000));
consumer.close();
importMessages(address, file);
List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT, false);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
assertEquals(((MapMessage) sent.get(i)).getString(key), ((MapMessage) received.get(i)).getString(key));
}
}
@Test
@ -211,8 +258,7 @@ public class MessageSerializerTest extends CliTestBase {
try (ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = cf.createConnection("admin", "admin");) {
// send messages to queue
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Session session = createSession(connection);
Destination queue1 = session.createQueue(address + "::" + queue1Name);
Destination queue2 = session.createQueue(address + "::" + queue2Name);
@ -221,10 +267,10 @@ public class MessageSerializerTest extends CliTestBase {
MessageConsumer consumer2 = session.createConsumer(queue2);
Artemis.main("producer",
"--user", "admin",
"--password", "admin",
"--destination", "fqqn://" + address + "::" + queue1Name,
"--message-count", "5");
"--user", "admin",
"--password", "admin",
"--destination", "fqqn://" + address + "::" + queue1Name,
"--message-count", "5");
assertNull(consumer2.receive(1000));
assertNotNull(consumer1.receive(1000));
@ -239,28 +285,115 @@ public class MessageSerializerTest extends CliTestBase {
String destination = "fqqn://" + fqqn;
File file = createMessageFile();
int noMessages = 10;
createQueue("--multicast", addr, queue);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Session session = createSession(connection);
Topic topic = session.createTopic(addr);
List<Message> messages = new ArrayList<>(noMessages);
for (int i = 0; i < noMessages; i++) {
messages.add(session.createTextMessage(RandomUtil.randomString()));
}
List<Message> messages = generateTextMessages(session, topic);
sendMessages(session, topic, messages);
exportMessages(destination, noMessages, file);
exportMessages(destination, file);
importMessages(destination, file);
List<Message> recieved = consumeMessages(session, fqqn, noMessages, true);
for (int i = 0; i < noMessages; i++) {
assertEquals(((TextMessage) messages.get(i)).getText(), ((TextMessage) recieved.get(i)).getText());
checkSentMessages(session, messages, fqqn);
}
@Test
public void testAnycastToMulticastTopic() throws Exception {
String mAddress = "testMulticast";
String aAddress = "testAnycast";
String queueM1Name = "queueM1";
String queueM2Name = "queueM2";
File file = createMessageFile();
createQueue("--multicast", mAddress, queueM1Name);
createQueue("--multicast", mAddress, queueM2Name);
Session session = createSession(connection);
List<Message> messages = generateTextMessages(session, aAddress);
exportMessages(aAddress, file);
importMessages("topic://" + mAddress, file);
checkSentMessages(session, messages, queueM1Name);
checkSentMessages(session, messages, queueM2Name);
}
@Test
public void testAnycastToMulticastFQQN() throws Exception {
String mAddress = "testMulticast";
String aAddress = "testAnycast";
String queueM1Name = "queueM1";
String queueM2Name = "queueM2";
String fqqnMulticast1 = mAddress + "::" + queueM1Name;
String fqqnMulticast2 = mAddress + "::" + queueM2Name;
File file = createMessageFile();
createQueue("--multicast", mAddress, queueM1Name);
createQueue("--multicast", mAddress, queueM2Name);
Session session = createSession(connection);
List<Message> messages = generateTextMessages(session, aAddress);
exportMessages(aAddress, file);
importMessages("fqqn://" + fqqnMulticast1, file);
checkSentMessages(session, messages, fqqnMulticast1);
MessageConsumer consumer = session.createConsumer(getDestination(fqqnMulticast2));
assertNull(consumer.receive(1000));
}
@Test
public void testMulticastTopicToAnycastQueueBothAddress() throws Exception {
String address = "testBoth";
String clientId = "test-client-id";
File file = createMessageFile();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
createBothTypeAddress(address);
exportMessages("topic://" + address, 0, true, clientId, file);
connection.start();
List<Message> messages = generateTextMessages(session, getTopicDestination(address));
exportMessages("topic://" + address, TEST_MESSAGE_COUNT, true, clientId, file);
importMessages(address, file);
checkSentMessages(session, messages, address);
}
@Test
public void testAnycastQueueToMulticastTopicBothAddress() throws Exception {
String address = "testBoth";
String clientId = "test-client-id";
File file = createMessageFile();
connection.setClientID(clientId);
createBothTypeAddress(address);
createQueue("--anycast", address, address);
Session session = createSession(connection);
TopicSubscriber subscriber = session.createDurableSubscriber(session.createTopic(address), "test-subscriber");
List<Message> messages = generateTextMessages(session, address);
exportMessages(address, file);
importMessages("topic://" + address, file);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
TextMessage messageReceived = (TextMessage) subscriber.receive(1000);
assertEquals(((TextMessage) messages.get(i)).getText(), messageReceived.getText());
}
}
@ -285,8 +418,4 @@ public class MessageSerializerTest extends CliTestBase {
return lines;
}
private String getTestMessageBody() {
return "Sample Message";
}
}