This closes #293

This commit is contained in:
Clebert Suconic 2016-01-05 10:44:54 -05:00
commit 94fb2c7b50
2 changed files with 158 additions and 1 deletions

View File

@ -1036,7 +1036,14 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
context.encode(bodyBuffer, localChunkLen);
byte[] body = bodyBuffer.toByteBuffer().array();
byte[] body;
if (bodyBuffer.toByteBuffer().hasArray()) {
body = bodyBuffer.toByteBuffer().array();
}
else {
body = new byte[0];
}
int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);

View File

@ -2104,6 +2104,156 @@ public class LargeMessageTest extends LargeMessageTestBase {
}
}
// https://issues.apache.org/jira/browse/ARTEMIS-331
@Test
public void testSendStreamingSingleEmptyMessage() throws Exception {
final String propertyName = "myStringPropertyName";
final String propertyValue = "myStringPropertyValue";
ClientSession session = null;
ActiveMQServer server = null;
final int SIZE = 0;
try {
server = createServer(true, isNetty());
server.start();
locator.setMinLargeMessageSize(100 * 1024);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
session = sf.createSession(null, null, false, true, true, false, 0);
session.createQueue(ADDRESS, ADDRESS, null, true);
ClientMessage clientFile = session.createMessage(true);
clientFile.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(SIZE));
clientFile.putStringProperty(propertyName, propertyValue);
ClientProducer producer = session.createProducer(ADDRESS);
session.start();
log.debug("Sending");
producer.send(clientFile);
producer.close();
log.debug("Waiting");
ClientConsumer consumer = session.createConsumer(ADDRESS);
ClientMessage msg2 = consumer.receive(10000);
msg2.acknowledge();
msg2.setOutputStream(createFakeOutputStream());
Assert.assertTrue(msg2.waitOutputStreamCompletion(60000));
Assert.assertEquals(propertyValue, msg2.getStringProperty(propertyName));
session.commit();
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
}
finally {
try {
session.close();
}
catch (Throwable ignored) {
}
try {
server.stop();
}
catch (Throwable ignored) {
}
}
}
// https://issues.apache.org/jira/browse/ARTEMIS-331
@Test
public void testSendStreamingEmptyMessagesWithRestart() throws Exception {
final String propertyName = "myStringPropertyName";
final String propertyValue = "myStringPropertyValue";
ClientSession session = null;
ActiveMQServer server = null;
final int SIZE = 0;
try {
server = createServer(true, isNetty());
server.start();
locator.setMinLargeMessageSize(100 * 1024);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
session = sf.createSession(null, null, false, true, true, false, 0);
session.createQueue(ADDRESS, ADDRESS, null, true);
ClientProducer producer = session.createProducer(ADDRESS);
for (int i = 0; i < 10; i++) {
ClientMessage clientFile = session.createMessage(true);
clientFile.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(SIZE));
clientFile.putStringProperty(propertyName, propertyValue + i);
producer.send(clientFile);
}
producer.close();
session.close();
sf.close();
server.stop();
server.start();
sf = addSessionFactory(createSessionFactory(locator));
session = sf.createSession(null, null, false, true, true, false, 0);
ClientConsumer consumer = session.createConsumer(ADDRESS);
session.start();
for (int i = 0; i < 10; i++) {
ClientMessage msg2 = consumer.receive(10000);
msg2.acknowledge();
msg2.setOutputStream(createFakeOutputStream());
Assert.assertTrue(msg2.waitOutputStreamCompletion(60000));
Assert.assertEquals(propertyValue + i, msg2.getStringProperty(propertyName));
session.commit();
}
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
}
finally {
try {
session.close();
}
catch (Throwable ignored) {
}
try {
server.stop();
}
catch (Throwable ignored) {
}
}
}
/**
* Receive messages but never reads them, leaving the buffer pending
*/