This commit is contained in:
Clebert Suconic 2017-11-21 08:57:08 -05:00
commit 3fba3573a5
2 changed files with 37 additions and 5 deletions

View File

@ -19,12 +19,13 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
@ -205,9 +206,10 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
public ActiveMQBuffer getReadOnlyBodyBuffer() {
try {
file.open();
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer((int) file.size());
file.read(buffer.toByteBuffer());
return buffer;
int fileSize = (int) file.size();
ByteBuffer buffer = this.storageManager.largeMessagesFactory.newBuffer(fileSize);
file.read(buffer);
return new ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer));
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
@ -215,7 +217,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
file.close();
} catch (Exception ignored) {
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@ -60,4 +61,34 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
producer.send(message);
}
}
@Test
public void testSendReceiveLargeMessage() throws Exception {
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(lmAddress.toString());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// Create 1MB Message
int size = 1024 * 1024;
byte[] bytes = new byte[size];
bytes[0] = 1;
BytesMessage message = session.createBytesMessage();
message.writeBytes(bytes);
producer.send(message);
MessageConsumer consumer = session.createConsumer(queue);
BytesMessage m = (BytesMessage) consumer.receive();
assertNotNull(m);
byte[] body = new byte[size];
m.readBytes(body);
assertArrayEquals(body, bytes);
}
}
}