NO-JIRA Adding CoreMessage Test with Bytes access

This commit is contained in:
Clebert Suconic 2021-03-23 10:03:18 -04:00
parent bb94d0a5b3
commit 89ea4ecda7
1 changed files with 51 additions and 0 deletions

View File

@ -19,8 +19,13 @@ package org.apache.activemq.artemis.tests.integration.client;
import java.util.UUID; import java.util.UUID;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
@ -82,4 +87,50 @@ public class MessageBufferTest extends ActiveMQTestBase {
message.acknowledge(); message.acknowledge();
assertEquals(data, message.getBodyBuffer().readString()); assertEquals(data, message.getBodyBuffer().readString());
} }
@Test
public void simpleTestBytes() throws Exception {
ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.buffer(1500);
for (int i = 0; i < 1024; i++) {
buf.writeByte(getSamplebyte(i));
}
final String queueName = "simpleQueue";
final String addressName = "simpleAddress";
session.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST));
ClientProducer producer = session.createProducer(addressName);
{
ClientMessage message = (ClientMessageImpl) session.createMessage(true);
Assert.assertEquals(1024, buf.readableBytes());
message.getBodyBuffer().writeBytes(buf, 0, buf.readableBytes());
producer.send(message);
}
session.commit();
producer.close();
ClientConsumer consumer = session.createConsumer(queueName);
session.start();
{
ClientMessage message = consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(1024, message.getBodySize());
ActiveMQBuffer buffer = message.getDataBuffer();
Assert.assertEquals(1024, message.getBodySize());
ActiveMQBuffer bodyBuffer = message.getBodyBuffer();
Assert.assertEquals(1024, message.getBodySize());
ActiveMQBuffer result = ActiveMQBuffers.fixedBuffer(message.getBodyBufferSize());
buffer.readBytes(result);
Assert.assertEquals(1024, result.readableBytes());
for (int i = 0; i < 1024; i++) {
Assert.assertEquals(getSamplebyte(i), result.readByte());
}
assertNotNull(message);
message.acknowledge();
}
}
} }