diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java index a2fbaddef8..f8063d1747 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java @@ -18,10 +18,9 @@ package org.apache.activemq.artemis.cli.commands.tools.xml; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamWriter; +import java.nio.ByteBuffer; import java.util.List; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; @@ -34,7 +33,7 @@ import org.apache.activemq.artemis.reader.TextMessageUtil; /** This is an Utility class that will import the outputs in XML format. */ public class XMLMessageExporter { - private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L; + private static final int LARGE_MESSAGE_CHUNK_SIZE = 1000; private XMLStreamWriter xmlWriter; @@ -70,6 +69,15 @@ public class XMLMessageExporter { xmlWriter.writeEndElement(); // end MESSAGE_BODY } + private static ByteBuffer acquireHeapBodyBuffer(ByteBuffer chunkBytes, int requiredCapacity) { + if (chunkBytes == null || chunkBytes.capacity() != requiredCapacity) { + chunkBytes = ByteBuffer.allocate(requiredCapacity); + } else { + chunkBytes.clear(); + } + return chunkBytes; + } + public void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException { xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString()); LargeBodyEncoder encoder = null; @@ -78,18 +86,19 @@ public class XMLMessageExporter { encoder = message.toCore().getBodyEncoder(); encoder.open(); long totalBytesWritten = 0; - Long bufferSize; + int bufferSize; long bodySize = encoder.getLargeBodySize(); + ByteBuffer buffer = null; for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) { - Long remainder = bodySize - totalBytesWritten; + long remainder = bodySize - totalBytesWritten; if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) { bufferSize = LARGE_MESSAGE_CHUNK_SIZE; } else { - bufferSize = remainder; + bufferSize = (int) remainder; } - ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue()); - encoder.encode(buffer, bufferSize.intValue()); - xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array())); + buffer = acquireHeapBodyBuffer(buffer, bufferSize); + encoder.encode(buffer); + xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.array())); totalBytesWritten += bufferSize; } encoder.close(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java index d14c64ee31..52ceb992dd 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java @@ -21,8 +21,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; @@ -400,17 +398,10 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter } @Override - public int encode(final ByteBuffer bufferRead) throws ActiveMQException { - ActiveMQBuffer buffer1 = ActiveMQBuffers.wrappedBuffer(bufferRead); - return encode(buffer1, bufferRead.capacity()); - } - - @Override - public int encode(final ActiveMQBuffer bufferOut, final int size) { - byte[] bytes = new byte[size]; - buffer.readBytes(bytes); - bufferOut.writeBytes(bytes, 0, size); - return size; + public int encode(final ByteBuffer bufferRead) { + final int remaining = bufferRead.remaining(); + buffer.readBytes(bufferRead); + return remaining; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index 0ad1999316..5cda0c433a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -18,10 +18,9 @@ package org.apache.activemq.artemis.core.client.impl; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicLong; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; @@ -381,16 +380,18 @@ public class ClientProducerImpl implements ClientProducerInternal { final int chunkLength = (int) Math.min((bodySize - pos), minLargeMessageSize); - final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength); + final ByteBuffer bodyBuffer = ByteBuffer.allocate(chunkLength); - context.encode(bodyBuffer, chunkLength); + final int encodedSize = context.encode(bodyBuffer); + + assert encodedSize == chunkLength; pos += chunkLength; lastChunk = pos >= bodySize; SendAcknowledgementHandler messageHandler = lastChunk ? handler : null; - int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler); + int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.array(), messageHandler); credits.acquireCredits(creditsUsed); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java index 7a248b4b97..87e5ba6cec 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.message; import java.nio.ByteBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; /** @@ -43,11 +42,6 @@ public interface LargeBodyEncoder { */ int encode(ByteBuffer bufferRead) throws ActiveMQException; - /** - * This method must not be called directly by ActiveMQ Artemis clients. - */ - int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException; - /** * This method must not be called directly by ActiveMQ Artemis clients. */ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 5272200e01..cc79c2c830 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -254,14 +254,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } private ActiveMQBuffer getLargeMessageBuffer() throws ActiveMQException { - ActiveMQBuffer buffer; LargeBodyEncoder encoder = getBodyEncoder(); encoder.open(); int bodySize = (int) encoder.getLargeBodySize(); - - buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize)); - - encoder.encode(buffer, bodySize); + final ActiveMQBuffer buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize)); + buffer.byteBuf().ensureWritable(bodySize); + final ByteBuffer nioBuffer = buffer.byteBuf().internalNioBuffer(0, bodySize); + encoder.encode(nioBuffer); + buffer.writerIndex(bodySize); encoder.close(); return buffer; } @@ -1154,16 +1154,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } @Override - public int encode(final ByteBuffer bufferRead) throws ActiveMQException { - ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bufferRead); - return encode(buffer, bufferRead.capacity()); - } - - @Override - public int encode(final ActiveMQBuffer bufferOut, final int size) { - bufferOut.byteBuf().writeBytes(buffer, lastPos, size); - lastPos += size; - return size; + public int encode(final ByteBuffer bufferRead) { + final int remaining = bufferRead.remaining(); + buffer.getBytes(lastPos, bufferRead); + lastPos += remaining; + return remaining; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 867f2d43ec..1d954c8b16 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -783,6 +783,32 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } } + public final void addBytesToLargeMessage(final SequentialFile file, + final long messageId, + final ActiveMQBuffer bytes) throws Exception { + readLock(); + try { + file.position(file.size()); + if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) { + final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes()); + file.writeDirect(nioBytes, false); + + if (isReplicated()) { + //copy defensively bytes + final byte[] bytesCopy = new byte[bytes.readableBytes()]; + bytes.getBytes(bytes.readerIndex(), bytesCopy); + replicator.largeMessageWrite(messageId, bytesCopy); + } + } else { + final byte[] bytesCopy = new byte[bytes.readableBytes()]; + bytes.readBytes(bytesCopy); + addBytesToLargeMessage(file, messageId, bytesCopy); + } + } finally { + readUnLock(); + } + } + @Override public final void addBytesToLargeMessage(final SequentialFile file, final long messageId, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 257141ebb2..110070c204 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -53,10 +53,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe // We should only use the NIO implementation on the Journal private SequentialFile file; - // set when a copyFrom is called - // The actual copy is done when finishCopy is called - private SequentialFile pendingCopy; - private long bodySize = -1; private final AtomicInteger delayDeletionCount = new AtomicInteger(0); @@ -131,6 +127,21 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe bodySize += bytes.length; } + @Override + public synchronized void addBytes(final ActiveMQBuffer bytes) throws Exception { + validateFile(); + + if (!file.isOpen()) { + file.open(); + } + + final int readableBytes = bytes.readableBytes(); + + storageManager.addBytesToLargeMessage(file, getMessageID(), bytes); + + bodySize += readableBytes; + } + @Override public synchronized int getEncodeSize() { return getHeadersAndPropertiesEncodeSize(); @@ -488,22 +499,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe } } - @Override - public int encode(final ActiveMQBuffer bufferOut, final int size) throws ActiveMQException { - // This could maybe be optimized (maybe reading directly into bufferOut) - ByteBuffer bufferRead = ByteBuffer.allocate(size); - - int bytesRead = encode(bufferRead); - - bufferRead.flip(); - - if (bytesRead > 0) { - bufferOut.writeBytes(bufferRead.array(), 0, bytesRead); - } - - return bytesRead; - } - /* (non-Javadoc) * @see org.apache.activemq.artemis.core.message.LargeBodyEncoder#getLargeBodySize() */ @@ -512,4 +507,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe return getBodySize(); } } + + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java index 6280746281..d709e28e2f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java @@ -48,11 +48,31 @@ class NullStorageLargeServerMessage extends CoreMessage implements LargeServerMe buffer.writeBytes(bytes); } + @Override + public synchronized void addBytes(ActiveMQBuffer bytes) { + final int readableBytes = bytes.readableBytes(); + if (buffer == null) { + buffer = Unpooled.buffer(readableBytes); + } + + // expand the buffer + buffer.ensureWritable(readableBytes); + assert buffer.hasArray(); + final int writerIndex = buffer.writerIndex(); + bytes.readBytes(buffer.array(), buffer.arrayOffset() + writerIndex, readableBytes); + buffer.writerIndex(writerIndex + readableBytes); + } + @Override public synchronized ActiveMQBuffer getReadOnlyBodyBuffer() { return new ChannelBufferWrapper(buffer.slice(0, buffer.writerIndex()).asReadOnly()); } + @Override + public synchronized int getBodyBufferSize() { + return buffer.writerIndex(); + } + @Override public void deleteFile() throws Exception { // nothing to be done here.. we don really have a file on this Storage diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java index a80e3692b3..69592bb70d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.core.io.SequentialFile; @@ -28,6 +29,8 @@ public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage @Override void addBytes(byte[] bytes) throws Exception; + void addBytes(ActiveMQBuffer bytes) throws Exception; + /** * We have to copy the large message content in case of DLQ and paged messages * For that we need to pre-mark the LargeMessage with a flag when it is paged diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index ddd0b71720..470aeb658e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.server.impl; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -27,8 +28,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.Message; @@ -1182,12 +1181,29 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private LargeBodyEncoder context; + private ByteBuffer chunkBytes; + private LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception { largeMessage = message; largeMessage.incrementDelayDeletionCount(); this.ref = ref; + + this.chunkBytes = null; + } + + private ByteBuffer acquireHeapBodyBuffer(int requiredCapacity) { + if (this.chunkBytes == null || this.chunkBytes.capacity() != requiredCapacity) { + this.chunkBytes = ByteBuffer.allocate(requiredCapacity); + } else { + this.chunkBytes.clear(); + } + return this.chunkBytes; + } + + private void releaseHeapBodyBuffer() { + this.chunkBytes = null; } public boolean deliver() throws Exception { @@ -1207,7 +1223,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + availableCredits); } - + releaseHeapBodyBuffer(); return false; } @@ -1223,7 +1239,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount()); if (availableCredits != null) { - availableCredits.addAndGet(-packetSize); + final int credits = availableCredits.addAndGet(-packetSize); + + if (credits <= 0) { + releaseHeapBodyBuffer(); + } if (logger.isTraceEnabled()) { logger.trace(this + "::FlowControl::" + @@ -1246,32 +1266,38 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + availableCredits); } - + releaseHeapBodyBuffer(); return false; } - int localChunkLen = 0; + final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize); - localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize); + final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen); - ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(localChunkLen); + assert bodyBuffer.remaining() == localChunkLen; - context.encode(bodyBuffer, localChunkLen); + final int readBytes = context.encode(bodyBuffer); - byte[] body; + assert readBytes == localChunkLen; - if (bodyBuffer.toByteBuffer().hasArray()) { - body = bodyBuffer.toByteBuffer().array(); - } else { - body = new byte[0]; - } + final byte[] body = bodyBuffer.array(); + + assert body.length == readBytes; + + //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation + //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if + //resendCache != null && packet.isRequiresConfirmations() int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false); int chunkLen = body.length; if (availableCredits != null) { - availableCredits.addAndGet(-packetSize); + final int credits = availableCredits.addAndGet(-packetSize); + + if (credits <= 0) { + releaseHeapBodyBuffer(); + } if (logger.isTraceEnabled()) { logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + @@ -1304,6 +1330,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { public void finish() throws Exception { synchronized (lock) { + releaseHeapBodyBuffer(); + if (largeMessage == null) { // handleClose could be calling close while handle is also calling finish. // As a result one of them could get here after the largeMessage is already gone. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index d868a2fb91..3354d3d7b0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1398,13 +1398,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener { private LargeServerMessage messageToLargeMessage(Message message) throws Exception { ICoreMessage coreMessage = message.toCore(); LargeServerMessage lsm = getStorageManager().createLargeMessage(storageManager.generateID(), coreMessage); - ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer(); - byte[] body = new byte[buffer.readableBytes()]; - buffer.readBytes(body); - lsm.addBytes(body); + final int readableBytes = buffer.readableBytes(); + lsm.addBytes(buffer); lsm.releaseResources(); - lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, body.length); + lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes); return lsm; }