ARTEMIS-1514 Fix read large message into buffer
This commit is contained in:
parent
9a8055bd3f
commit
eddb144deb
|
@ -19,12 +19,13 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
|
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
|
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
|
||||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||||
|
@ -205,9 +206,10 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
|
||||||
public ActiveMQBuffer getReadOnlyBodyBuffer() {
|
public ActiveMQBuffer getReadOnlyBodyBuffer() {
|
||||||
try {
|
try {
|
||||||
file.open();
|
file.open();
|
||||||
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer((int) file.size());
|
int fileSize = (int) file.size();
|
||||||
file.read(buffer.toByteBuffer());
|
ByteBuffer buffer = this.storageManager.largeMessagesFactory.newBuffer(fileSize);
|
||||||
return buffer;
|
file.read(buffer);
|
||||||
|
return new ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -215,7 +217,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
|
||||||
file.close();
|
file.close();
|
||||||
} catch (Exception ignored) {
|
} catch (Exception ignored) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
|
||||||
import javax.jms.BytesMessage;
|
import javax.jms.BytesMessage;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.DeliveryMode;
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
@ -60,4 +61,34 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendReceiveLargeMessage() throws Exception {
|
||||||
|
try (Connection connection = factory.createConnection()) {
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue queue = session.createQueue(lmAddress.toString());
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
|
||||||
|
// Create 1MB Message
|
||||||
|
int size = 1024 * 1024;
|
||||||
|
byte[] bytes = new byte[size];
|
||||||
|
bytes[0] = 1;
|
||||||
|
|
||||||
|
BytesMessage message = session.createBytesMessage();
|
||||||
|
message.writeBytes(bytes);
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
BytesMessage m = (BytesMessage) consumer.receive();
|
||||||
|
assertNotNull(m);
|
||||||
|
|
||||||
|
byte[] body = new byte[size];
|
||||||
|
m.readBytes(body);
|
||||||
|
|
||||||
|
assertArrayEquals(body, bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue