ARTEMIS-4234 Fix some issue with Embedded Resource send and receive

The JUnit resource has a couple bugs in both the send and receive path
that result in only one message being received if the receive method is
called repeatedly and some send drop the provided properties. Cleaned up
some tests to the point of showing the errors and ensuring that at least
basic functionality is tested.
This commit is contained in:
Timothy Bish 2023-04-10 09:37:45 -04:00 committed by Robbie Gemmell
parent 8abdee29e9
commit 7892841fec
15 changed files with 718 additions and 234 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.After;
@ -27,6 +28,9 @@ import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ActiveMQConsumerResourceTest {
@ -52,36 +56,64 @@ public class ActiveMQConsumerResourceTest {
@Rule
public RuleChain ruleChain = RuleChain.outerRule(server).around(consumer);
ClientMessage sent = null;
@After
public void tearDown() throws Exception {
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
ClientMessage received = consumer.receiveMessage();
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
server.stop();
}
@Test
public void testSendBytes() throws Exception {
sent = server.sendMessage(TEST_ADDRESS, TEST_BODY.getBytes());
final ClientMessage sent = server.sendMessage(TEST_ADDRESS, TEST_BODY.getBytes());
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
final ClientMessage received = consumer.receiveMessage();
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
@Test
public void testSendString() throws Exception {
sent = server.sendMessage(TEST_ADDRESS, TEST_BODY);
final ClientMessage sent = server.sendMessage(TEST_ADDRESS, TEST_BODY);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
final ClientMessage received = consumer.receiveMessage();
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
@Test
public void testSendBytesAndProperties() throws Exception {
sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY.getBytes(), TEST_PROPERTIES);
final ClientMessage sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY.getBytes(), TEST_PROPERTIES);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
final ClientMessage received = consumer.receiveMessage();
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
@Test
public void testSendStringAndProperties() throws Exception {
sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY, TEST_PROPERTIES);
}
final ClientMessage sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY, TEST_PROPERTIES);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
final ClientMessage received = consumer.receiveMessage();
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.After;
@ -27,6 +28,9 @@ import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ActiveMQDynamicProducerResourceTest {
@ -52,45 +56,108 @@ public class ActiveMQDynamicProducerResourceTest {
@Rule
public RuleChain ruleChain = RuleChain.outerRule(server).around(producer);
ClientMessage sentOne = null;
ClientMessage sentTwo = null;
@After
public void tearDown() throws Exception {
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE), sentOne);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO), sentTwo);
ClientMessage receivedOne = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE), receivedOne);
ClientMessage receivedTwo = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), receivedTwo);
server.stop();
}
@Test
public void testSendBytes() throws Exception {
sentOne = producer.sendMessage(TEST_BODY.getBytes());
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes());
final ClientMessage sentOne = producer.sendMessage(TEST_BODY.getBytes());
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes());
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE), sentOne);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO), sentTwo);
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
}
@Test
public void testSendString() throws Exception {
sentOne = producer.sendMessage(TEST_BODY);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY);
final ClientMessage sentOne = producer.sendMessage(TEST_BODY);
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE), sentOne);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO), sentTwo);
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
}
@Test
public void testSendBytesAndProperties() throws Exception {
sentOne = producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes(), TEST_PROPERTIES);
final ClientMessage sentOne = producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes(), TEST_PROPERTIES);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE), sentOne);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO), sentTwo);
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
}
@Test
public void testSendStringAndProperties() throws Exception {
sentOne = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY, TEST_PROPERTIES);
}
final ClientMessage sentOne = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY, TEST_PROPERTIES);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE), sentOne);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO), sentTwo);
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
}
}

View File

@ -20,7 +20,6 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -51,8 +50,6 @@ public class ActiveMQDynamicProducerResourceWithoutAddressExceptionTest {
@Rule
public RuleChain ruleChain = RuleChain.outerRule(server).around(producer);
ClientMessage sentOne = null;
@Before
public void setUp() throws Exception {
producer.setAutoCreateQueue(false);
@ -61,21 +58,21 @@ public class ActiveMQDynamicProducerResourceWithoutAddressExceptionTest {
@Test(expected = IllegalArgumentException.class)
public void testSendBytesToDefaultAddress() throws Exception {
sentOne = producer.sendMessage(TEST_BODY.getBytes());
producer.sendMessage(TEST_BODY.getBytes());
}
@Test(expected = IllegalArgumentException.class)
public void testSendStringToDefaultAddress() throws Exception {
sentOne = producer.sendMessage(TEST_BODY);
producer.sendMessage(TEST_BODY);
}
@Test(expected = IllegalArgumentException.class)
public void testSendBytesAndPropertiesToDefaultAddress() throws Exception {
sentOne = producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
}
@Test(expected = IllegalArgumentException.class)
public void testSendStringAndPropertiesToDefaultAddress() throws Exception {
sentOne = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.After;
@ -28,6 +29,9 @@ import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ActiveMQDynamicProducerResourceWithoutAddressTest {
@ -53,9 +57,6 @@ public class ActiveMQDynamicProducerResourceWithoutAddressTest {
@Rule
public RuleChain ruleChain = RuleChain.outerRule(server).around(producer);
ClientMessage sentOne = null;
ClientMessage sentTwo = null;
@Before
public void setUp() throws Exception {
producer.setAutoCreateQueue(false);
@ -65,40 +66,106 @@ public class ActiveMQDynamicProducerResourceWithoutAddressTest {
@After
public void tearDown() throws Exception {
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE), sentOne);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO), sentTwo);
ClientMessage receivedOne = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE), receivedOne);
ClientMessage receivedTwo = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), receivedTwo);
server.stop();
}
@Test
public void testSendBytes() throws Exception {
sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY.getBytes());
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes());
final ClientMessage sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY.getBytes());
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes());
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE), sentOne);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO), sentTwo);
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
}
@Test
public void testSendString() throws Exception {
sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY);
final ClientMessage sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY);
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE), sentOne);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO), sentTwo);
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
}
@Test
public void testSendBytesAndProperties() throws Exception {
sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY.getBytes(), TEST_PROPERTIES);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes(), TEST_PROPERTIES);
final ClientMessage sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY.getBytes(), TEST_PROPERTIES);
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes(), TEST_PROPERTIES);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE), sentOne);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO), sentTwo);
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
}
@Test
public void testSendStringAndProperties() throws Exception {
sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY, TEST_PROPERTIES);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY, TEST_PROPERTIES);
}
final ClientMessage sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY, TEST_PROPERTIES);
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY, TEST_PROPERTIES);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE), sentOne);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO), sentTwo);
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
}
}

View File

@ -19,14 +19,17 @@ package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ActiveMQProducerResourceTest {
@ -52,35 +55,59 @@ public class ActiveMQProducerResourceTest {
@Rule
public RuleChain ruleChain = RuleChain.outerRule(server).around(producer);
ClientMessage sent = null;
@After
public void checkResults() throws Exception {
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE), received);
}
@Test
public void testSendBytes() throws Exception {
sent = producer.sendMessage(TEST_BODY.getBytes());
final ClientMessage sent = producer.sendMessage(TEST_BODY.getBytes());
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
@Test
public void testSendString() throws Exception {
sent = producer.sendMessage(TEST_BODY);
final ClientMessage sent = producer.sendMessage(TEST_BODY);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
@Test
public void testSendBytesAndProperties() throws Exception {
sent = producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
final ClientMessage sent = producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
@Test
public void testSendStringAndProperties() throws Exception {
sent = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
}
final ClientMessage sent = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.After;
@ -28,6 +29,9 @@ import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class EmbeddedActiveMQResourceTest {
@ -51,8 +55,6 @@ public class EmbeddedActiveMQResourceTest {
@Rule
public RuleChain rulechain = RuleChain.outerRule(server);
ClientMessage sent = null;
@Before
public void setUp() throws Exception {
server.createQueue(TEST_ADDRESS, TEST_QUEUE);
@ -60,32 +62,66 @@ public class EmbeddedActiveMQResourceTest {
@After
public void tearDown() throws Exception {
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
server.stop();
}
@Test
public void testSendBytes() throws Exception {
sent = server.sendMessage(TEST_ADDRESS, TEST_BODY.getBytes());
final ClientMessage sent = server.sendMessage(TEST_ADDRESS, TEST_BODY.getBytes());
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
@Test
public void testSendString() throws Exception {
sent = server.sendMessage(TEST_ADDRESS, TEST_BODY);
final ClientMessage sent = server.sendMessage(TEST_ADDRESS, TEST_BODY);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
@Test
public void testSendBytesAndProperties() throws Exception {
sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY.getBytes(), TEST_PROPERTIES);
final ClientMessage sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY.getBytes(), TEST_PROPERTIES);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
@Test
public void testSendStringAndProperties() throws Exception {
sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY, TEST_PROPERTIES);
}
final ClientMessage sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY, TEST_PROPERTIES);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
}

View File

@ -19,15 +19,18 @@ package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.TestInstance.Lifecycle;
@TestInstance(Lifecycle.PER_CLASS)
@ -55,34 +58,59 @@ public class ActiveMQConsumerResourceTest {
@Order(2)
public ActiveMQConsumerExtension consumer = new ActiveMQConsumerExtension(server.getVmURL(), TEST_QUEUE);
ClientMessage sent = null;
@AfterAll
public void tearDown() {
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
ClientMessage received = consumer.receiveMessage();
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS));
}
@Test
public void testSendBytes() {
sent = server.sendMessage(TEST_ADDRESS, TEST_BODY.getBytes());
final ClientMessage sent = server.sendMessage(TEST_ADDRESS, TEST_BODY.getBytes());
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
final ClientMessage received = consumer.receiveMessage();
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS));
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
@Test
public void testSendString() {
sent = server.sendMessage(TEST_ADDRESS, TEST_BODY);
final ClientMessage sent = server.sendMessage(TEST_ADDRESS, TEST_BODY);
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
final ClientMessage received = consumer.receiveMessage();
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
@Test
public void testSendBytesAndProperties() {
sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY.getBytes(), TEST_PROPERTIES);
final ClientMessage sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY.getBytes(), TEST_PROPERTIES);
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
final ClientMessage received = consumer.receiveMessage();
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS));
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
@Test
public void testSendStringAndProperties() {
sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY, TEST_PROPERTIES);
}
final ClientMessage sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY, TEST_PROPERTIES);
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
final ClientMessage received = consumer.receiveMessage();
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
}

View File

@ -19,15 +19,18 @@ package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.TestInstance.Lifecycle;
@TestInstance(Lifecycle.PER_CLASS)
@ -56,43 +59,113 @@ public class ActiveMQDynamicProducerResourceTest {
@Order(2)
public ActiveMQDynamicProducerExtension producer = new ActiveMQDynamicProducerExtension(server.getVmURL(), TEST_QUEUE_ONE);
ClientMessage sentOne = null;
ClientMessage sentTwo = null;
@Test
public void testSendBytes() {
final ClientMessage sentOne = producer.sendMessage(TEST_BODY.getBytes());
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes());
@AfterAll
public void tearDown() {
assertNotNull(sentOne, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE));
assertNotNull(sentTwo, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO));
ClientMessage receivedOne = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(receivedOne, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE));
ClientMessage receivedTwo = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(receivedTwo, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO));
}
@Test
public void testSendBytes() {
sentOne = producer.sendMessage(TEST_BODY.getBytes());
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes());
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE));
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE));
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
}
@Test
public void testSendString() {
sentOne = producer.sendMessage(TEST_BODY);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY);
final ClientMessage sentOne = producer.sendMessage(TEST_BODY);
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY);
assertNotNull(sentOne, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE));
assertNotNull(sentTwo, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO));
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
}
@Test
public void testSendBytesAndProperties() {
sentOne = producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes(), TEST_PROPERTIES);
final ClientMessage sentOne = producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes(), TEST_PROPERTIES);
assertNotNull(sentOne, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE));
assertNotNull(sentTwo, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO));
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE));
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE));
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
}
@Test
public void testSendStringAndProperties() {
sentOne = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY, TEST_PROPERTIES);
}
final ClientMessage sentOne = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY, TEST_PROPERTIES);
assertNotNull(sentOne, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE));
assertNotNull(sentTwo, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO));
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
}
}

View File

@ -20,7 +20,6 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
@ -51,8 +50,6 @@ public class ActiveMQDynamicProducerResourceWithoutAddressExceptionTest {
@Order(2)
public ActiveMQDynamicProducerExtension producer = new ActiveMQDynamicProducerExtension(server.getVmURL());
ClientMessage sentOne = null;
@BeforeAll
public void setUp() throws Exception {
producer.setAutoCreateQueue(false);
@ -62,29 +59,28 @@ public class ActiveMQDynamicProducerResourceWithoutAddressExceptionTest {
@Test
public void testSendBytesToDefaultAddress() {
assertThrows(IllegalArgumentException.class, () -> {
sentOne = producer.sendMessage(TEST_BODY.getBytes());
producer.sendMessage(TEST_BODY.getBytes());
});
}
@Test
public void testSendStringToDefaultAddress() {
assertThrows(IllegalArgumentException.class, () -> {
sentOne = producer.sendMessage(TEST_BODY);
producer.sendMessage(TEST_BODY);
});
}
@Test
public void testSendBytesAndPropertiesToDefaultAddress() {
assertThrows(IllegalArgumentException.class, () -> {
sentOne = producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
});
}
@Test
public void testSendStringAndPropertiesToDefaultAddress() {
assertThrows(IllegalArgumentException.class, () -> {
sentOne = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
});
}
}

View File

@ -19,16 +19,19 @@ package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.TestInstance.Lifecycle;
@TestInstance(Lifecycle.PER_CLASS)
@ -57,9 +60,6 @@ public class ActiveMQDynamicProducerResourceWithoutAddressTest {
@Order(2)
public ActiveMQDynamicProducerExtension producer = new ActiveMQDynamicProducerExtension(server.getVmURL());
ClientMessage sentOne = null;
ClientMessage sentTwo = null;
@BeforeAll
public void setUp() {
producer.setAutoCreateQueue(false);
@ -67,40 +67,115 @@ public class ActiveMQDynamicProducerResourceWithoutAddressTest {
server.createQueue(TEST_QUEUE_TWO, TEST_QUEUE_TWO);
}
@AfterAll
public void tearDown() {
@Test
public void testSendBytes() {
final ClientMessage sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY.getBytes());
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes());
assertNotNull(sentOne, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE));
assertNotNull(sentTwo, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO));
ClientMessage receivedOne = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(receivedOne, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE));
ClientMessage receivedTwo = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(receivedTwo, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO));
}
@Test
public void testSendBytes() {
sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY.getBytes());
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes());
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE));
final ActiveMQBuffer receuvedBuffer = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[receuvedBuffer.readableBytes()];
receuvedBuffer.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO));
final ActiveMQBuffer receivedBuffer = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[receivedBuffer.readableBytes()];
receivedBuffer.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
}
@Test
public void testSendString() {
sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY);
final ClientMessage sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY);
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY);
assertNotNull(sentOne, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE));
assertNotNull(sentTwo, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO));
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
}
@Test
public void testSendBytesAndProperties() {
sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY.getBytes(), TEST_PROPERTIES);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes(), TEST_PROPERTIES);
final ClientMessage sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY.getBytes(), TEST_PROPERTIES);
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes(), TEST_PROPERTIES);
assertNotNull(sentOne, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE));
assertNotNull(sentTwo, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO));
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE));
final ActiveMQBuffer receivedBuffer = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[receivedBuffer.readableBytes()];
receivedBuffer.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO));
final ActiveMQBuffer receivedBuffer = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[receivedBuffer.readableBytes()];
receivedBuffer.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
}
@Test
public void testSendStringAndProperties() {
sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY, TEST_PROPERTIES);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY, TEST_PROPERTIES);
}
final ClientMessage sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY, TEST_PROPERTIES);
final ClientMessage sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY, TEST_PROPERTIES);
assertNotNull(sentOne, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE));
assertNotNull(sentTwo, String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO));
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
}
}

View File

@ -19,15 +19,18 @@ package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.TestInstance.Lifecycle;
@TestInstance(Lifecycle.PER_CLASS)
@ -55,34 +58,59 @@ public class ActiveMQProducerResourceTest {
@Order(2)
public ActiveMQDynamicProducerExtension producer = new ActiveMQDynamicProducerExtension(server.getVmURL(), TEST_ADDRESS);
ClientMessage sent = null;
@AfterAll
public void checkResults() {
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE));
}
@Test
public void testSendBytes() {
sent = producer.sendMessage(TEST_BODY.getBytes());
final ClientMessage sent = producer.sendMessage(TEST_BODY.getBytes());
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE));
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
@Test
public void testSendString() {
sent = producer.sendMessage(TEST_BODY);
final ClientMessage sent = producer.sendMessage(TEST_BODY);
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
@Test
public void testSendBytesAndProperties() {
sent = producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
final ClientMessage sent = producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE));
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
@Test
public void testSendStringAndProperties() {
sent = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
}
final ClientMessage sent = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
}

View File

@ -19,15 +19,18 @@ package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.TestInstance.Lifecycle;
@TestInstance(Lifecycle.PER_CLASS)
@ -51,39 +54,85 @@ public class EmbeddedActiveMQResourceTest {
@RegisterExtension
public EmbeddedActiveMQExtension server = new EmbeddedActiveMQExtension();
ClientMessage sent = null;
@BeforeAll
public void setUp() {
server.createQueue(TEST_ADDRESS, TEST_QUEUE);
}
@AfterAll
public void tearDown() {
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS));
}
@Test
public void testSendBytes() {
sent = server.sendMessage(TEST_ADDRESS, TEST_BODY.getBytes());
final ClientMessage sent = server.sendMessage(TEST_ADDRESS, TEST_BODY.getBytes());
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS));
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
}
@Test
public void testSendString() {
sent = server.sendMessage(TEST_ADDRESS, TEST_BODY);
final ClientMessage sent = server.sendMessage(TEST_ADDRESS, TEST_BODY);
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
@Test
public void testSendTwoStringMesssages() {
final ClientMessage sent1 = server.sendMessage(TEST_ADDRESS, TEST_BODY);
assertNotNull(sent1, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
final ClientMessage sent2 = server.sendMessage(TEST_ADDRESS, TEST_BODY + "-Second");
assertNotNull(sent2, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
}
{
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS));
assertEquals(TEST_BODY + "-Second", received.getReadOnlyBodyBuffer().readString());
}
}
@Test
public void testSendBytesAndProperties() {
sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY.getBytes(), TEST_PROPERTIES);
final byte[] bodyBytes = TEST_BODY.getBytes();
final ClientMessage sent = server.sendMessageWithProperties(TEST_ADDRESS, bodyBytes, TEST_PROPERTIES);
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS));
final ActiveMQBuffer body = received.getReadOnlyBodyBuffer();
final byte[] receivedBody = new byte[body.readableBytes()];
body.readBytes(receivedBody);
assertArrayEquals(TEST_BODY.getBytes(), receivedBody);
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
@Test
public void testSendStringAndProperties() {
sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY, TEST_PROPERTIES);
}
final ClientMessage sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY, TEST_PROPERTIES);
assertNotNull(sent, String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS));
final ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(received, String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS));
assertEquals(TEST_BODY, received.getReadOnlyBodyBuffer().readString());
TEST_PROPERTIES.forEach((k, v) -> {
assertTrue(received.containsProperty(k));
assertEquals(v, received.getStringProperty(k));
});
}
}

View File

@ -137,16 +137,15 @@ public class ActiveMQDynamicProducerDelegate extends ActiveMQProducerDelegate
@Override
public ClientMessage sendMessage(SimpleString targetAddress, byte[] body, Map<String, Object> properties) {
ClientMessage message = createMessage(body);
ClientMessage message = createMessage(body, properties);
sendMessage(targetAddress, message);
return message;
}
@Override
public ClientMessage sendMessage(SimpleString targetAddress, String body, Map<String, Object> properties) {
ClientMessage message = createMessage(body);
ClientMessage message = createMessage(body, properties);
sendMessage(targetAddress, message);
return message;
}
}

View File

@ -223,16 +223,15 @@ public class ActiveMQProducerDelegate extends AbstractActiveMQClientDelegate imp
@Override
public ClientMessage sendMessage(byte[] body, Map<String, Object> properties) {
ClientMessage message = createMessage(body);
ClientMessage message = createMessage(body, properties);
sendMessage(message);
return message;
}
@Override
public ClientMessage sendMessage(String body, Map<String, Object> properties) {
ClientMessage message = createMessage(body);
ClientMessage message = createMessage(body, properties);
sendMessage(message);
return message;
}
}

View File

@ -471,7 +471,6 @@ public class EmbeddedActiveMQDelegate implements EmbeddedActiveMQOperations {
@Override
public ClientMessage sendMessageWithProperties(SimpleString address, String body, Map<String, Object> properties) {
ClientMessage message = createMessageWithProperties(body, properties);
sendMessage(address, message);
return message;
@ -626,43 +625,55 @@ public class EmbeddedActiveMQDelegate implements EmbeddedActiveMQOperations {
public ClientMessage receiveMessage(SimpleString address, long timeout, boolean browseOnly) {
checkSession();
ClientConsumer consumer = null;
try {
consumer = session.createConsumer(address, browseOnly);
EmbeddedActiveMQResourceException failureCause = null;
try (ClientConsumer consumer = session.createConsumer(address, browseOnly)) {
ClientMessage message = null;
if (timeout > 0) {
try {
message = consumer.receive(timeout);
} catch (ActiveMQException amqEx) {
failureCause = new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive( timeout = %d ) for %s failed",
timeout, address.toString()), amqEx);
throw failureCause;
}
} else if (timeout == 0) {
try {
message = consumer.receiveImmediate();
} catch (ActiveMQException amqEx) {
failureCause = new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receiveImmediate() for %s failed",
address.toString()), amqEx);
throw failureCause;
}
} else {
try {
message = consumer.receive();
} catch (ActiveMQException amqEx) {
failureCause = new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive() for %s failed",
address.toString()), amqEx);
throw failureCause;
}
}
if (message != null) {
try {
message.acknowledge();
} catch (ActiveMQException amqEx) {
failureCause = new EmbeddedActiveMQResourceException(String.format("ClientMessage.acknowledge() for %s from %s failed",
message, address.toString()), amqEx);
throw failureCause;
}
}
return message;
} catch (ActiveMQException amqEx) {
throw new EmbeddedActiveMQResourceException(String.format("Failed to create consumer for %s",
address.toString()),
amqEx);
}
if (failureCause == null) {
failureCause = new EmbeddedActiveMQResourceException(String.format("Failed to create consumer for %s",
address.toString()), amqEx);
}
ClientMessage message = null;
if (timeout > 0) {
try {
message = consumer.receive(timeout);
} catch (ActiveMQException amqEx) {
throw new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive( timeout = %d ) for %s failed",
timeout, address.toString()),
amqEx);
}
} else if (timeout == 0) {
try {
message = consumer.receiveImmediate();
} catch (ActiveMQException amqEx) {
throw new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receiveImmediate() for %s failed",
address.toString()),
amqEx);
}
} else {
try {
message = consumer.receive();
} catch (ActiveMQException amqEx) {
throw new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive() for %s failed",
address.toString()),
amqEx);
}
throw failureCause;
}
return message;
}
void checkSession() {