This commit is contained in:
Clebert Suconic 2018-09-12 20:59:13 -04:00
commit ec24ee4561
11 changed files with 151 additions and 89 deletions

View File

@ -18,10 +18,9 @@ package org.apache.activemq.artemis.cli.commands.tools.xml;
import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter; import javax.xml.stream.XMLStreamWriter;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
@ -34,7 +33,7 @@ import org.apache.activemq.artemis.reader.TextMessageUtil;
/** This is an Utility class that will import the outputs in XML format. */ /** This is an Utility class that will import the outputs in XML format. */
public class XMLMessageExporter { public class XMLMessageExporter {
private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L; private static final int LARGE_MESSAGE_CHUNK_SIZE = 1000;
private XMLStreamWriter xmlWriter; private XMLStreamWriter xmlWriter;
@ -70,6 +69,15 @@ public class XMLMessageExporter {
xmlWriter.writeEndElement(); // end MESSAGE_BODY xmlWriter.writeEndElement(); // end MESSAGE_BODY
} }
private static ByteBuffer acquireHeapBodyBuffer(ByteBuffer chunkBytes, int requiredCapacity) {
if (chunkBytes == null || chunkBytes.capacity() != requiredCapacity) {
chunkBytes = ByteBuffer.allocate(requiredCapacity);
} else {
chunkBytes.clear();
}
return chunkBytes;
}
public void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException { public void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString()); xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
LargeBodyEncoder encoder = null; LargeBodyEncoder encoder = null;
@ -78,18 +86,19 @@ public class XMLMessageExporter {
encoder = message.toCore().getBodyEncoder(); encoder = message.toCore().getBodyEncoder();
encoder.open(); encoder.open();
long totalBytesWritten = 0; long totalBytesWritten = 0;
Long bufferSize; int bufferSize;
long bodySize = encoder.getLargeBodySize(); long bodySize = encoder.getLargeBodySize();
ByteBuffer buffer = null;
for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) { for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) {
Long remainder = bodySize - totalBytesWritten; long remainder = bodySize - totalBytesWritten;
if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) { if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) {
bufferSize = LARGE_MESSAGE_CHUNK_SIZE; bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
} else { } else {
bufferSize = remainder; bufferSize = (int) remainder;
} }
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue()); buffer = acquireHeapBodyBuffer(buffer, bufferSize);
encoder.encode(buffer, bufferSize.intValue()); encoder.encode(buffer);
xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array())); xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.array()));
totalBytesWritten += bufferSize; totalBytesWritten += bufferSize;
} }
encoder.close(); encoder.close();

View File

@ -21,8 +21,6 @@ import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
@ -400,17 +398,10 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
} }
@Override @Override
public int encode(final ByteBuffer bufferRead) throws ActiveMQException { public int encode(final ByteBuffer bufferRead) {
ActiveMQBuffer buffer1 = ActiveMQBuffers.wrappedBuffer(bufferRead); final int remaining = bufferRead.remaining();
return encode(buffer1, bufferRead.capacity()); buffer.readBytes(bufferRead);
} return remaining;
@Override
public int encode(final ActiveMQBuffer bufferOut, final int size) {
byte[] bytes = new byte[size];
buffer.readBytes(bytes);
bufferOut.writeBytes(bytes, 0, size);
return size;
} }
} }

View File

@ -18,10 +18,9 @@ package org.apache.activemq.artemis.core.client.impl;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
@ -381,16 +380,18 @@ public class ClientProducerImpl implements ClientProducerInternal {
final int chunkLength = (int) Math.min((bodySize - pos), minLargeMessageSize); final int chunkLength = (int) Math.min((bodySize - pos), minLargeMessageSize);
final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength); final ByteBuffer bodyBuffer = ByteBuffer.allocate(chunkLength);
context.encode(bodyBuffer, chunkLength); final int encodedSize = context.encode(bodyBuffer);
assert encodedSize == chunkLength;
pos += chunkLength; pos += chunkLength;
lastChunk = pos >= bodySize; lastChunk = pos >= bodySize;
SendAcknowledgementHandler messageHandler = lastChunk ? handler : null; SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler); int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.array(), messageHandler);
credits.acquireCredits(creditsUsed); credits.acquireCredits(creditsUsed);
} }

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.message;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
/** /**
@ -43,11 +42,6 @@ public interface LargeBodyEncoder {
*/ */
int encode(ByteBuffer bufferRead) throws ActiveMQException; int encode(ByteBuffer bufferRead) throws ActiveMQException;
/**
* This method must not be called directly by ActiveMQ Artemis clients.
*/
int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException;
/** /**
* This method must not be called directly by ActiveMQ Artemis clients. * This method must not be called directly by ActiveMQ Artemis clients.
*/ */

View File

@ -254,14 +254,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
} }
private ActiveMQBuffer getLargeMessageBuffer() throws ActiveMQException { private ActiveMQBuffer getLargeMessageBuffer() throws ActiveMQException {
ActiveMQBuffer buffer;
LargeBodyEncoder encoder = getBodyEncoder(); LargeBodyEncoder encoder = getBodyEncoder();
encoder.open(); encoder.open();
int bodySize = (int) encoder.getLargeBodySize(); int bodySize = (int) encoder.getLargeBodySize();
final ActiveMQBuffer buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize)); buffer.byteBuf().ensureWritable(bodySize);
final ByteBuffer nioBuffer = buffer.byteBuf().internalNioBuffer(0, bodySize);
encoder.encode(buffer, bodySize); encoder.encode(nioBuffer);
buffer.writerIndex(bodySize);
encoder.close(); encoder.close();
return buffer; return buffer;
} }
@ -1154,16 +1154,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
} }
@Override @Override
public int encode(final ByteBuffer bufferRead) throws ActiveMQException { public int encode(final ByteBuffer bufferRead) {
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bufferRead); final int remaining = bufferRead.remaining();
return encode(buffer, bufferRead.capacity()); buffer.getBytes(lastPos, bufferRead);
} lastPos += remaining;
return remaining;
@Override
public int encode(final ActiveMQBuffer bufferOut, final int size) {
bufferOut.byteBuf().writeBytes(buffer, lastPos, size);
lastPos += size;
return size;
} }
} }

View File

@ -783,6 +783,32 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
} }
} }
public final void addBytesToLargeMessage(final SequentialFile file,
final long messageId,
final ActiveMQBuffer bytes) throws Exception {
readLock();
try {
file.position(file.size());
if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) {
final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes());
file.writeDirect(nioBytes, false);
if (isReplicated()) {
//copy defensively bytes
final byte[] bytesCopy = new byte[bytes.readableBytes()];
bytes.getBytes(bytes.readerIndex(), bytesCopy);
replicator.largeMessageWrite(messageId, bytesCopy);
}
} else {
final byte[] bytesCopy = new byte[bytes.readableBytes()];
bytes.readBytes(bytesCopy);
addBytesToLargeMessage(file, messageId, bytesCopy);
}
} finally {
readUnLock();
}
}
@Override @Override
public final void addBytesToLargeMessage(final SequentialFile file, public final void addBytesToLargeMessage(final SequentialFile file,
final long messageId, final long messageId,

View File

@ -53,10 +53,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
// We should only use the NIO implementation on the Journal // We should only use the NIO implementation on the Journal
private SequentialFile file; private SequentialFile file;
// set when a copyFrom is called
// The actual copy is done when finishCopy is called
private SequentialFile pendingCopy;
private long bodySize = -1; private long bodySize = -1;
private final AtomicInteger delayDeletionCount = new AtomicInteger(0); private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
@ -131,6 +127,21 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
bodySize += bytes.length; bodySize += bytes.length;
} }
@Override
public synchronized void addBytes(final ActiveMQBuffer bytes) throws Exception {
validateFile();
if (!file.isOpen()) {
file.open();
}
final int readableBytes = bytes.readableBytes();
storageManager.addBytesToLargeMessage(file, getMessageID(), bytes);
bodySize += readableBytes;
}
@Override @Override
public synchronized int getEncodeSize() { public synchronized int getEncodeSize() {
return getHeadersAndPropertiesEncodeSize(); return getHeadersAndPropertiesEncodeSize();
@ -488,22 +499,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
} }
} }
@Override
public int encode(final ActiveMQBuffer bufferOut, final int size) throws ActiveMQException {
// This could maybe be optimized (maybe reading directly into bufferOut)
ByteBuffer bufferRead = ByteBuffer.allocate(size);
int bytesRead = encode(bufferRead);
bufferRead.flip();
if (bytesRead > 0) {
bufferOut.writeBytes(bufferRead.array(), 0, bytesRead);
}
return bytesRead;
}
/* (non-Javadoc) /* (non-Javadoc)
* @see org.apache.activemq.artemis.core.message.LargeBodyEncoder#getLargeBodySize() * @see org.apache.activemq.artemis.core.message.LargeBodyEncoder#getLargeBodySize()
*/ */
@ -512,4 +507,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
return getBodySize(); return getBodySize();
} }
} }
} }

View File

@ -48,11 +48,31 @@ class NullStorageLargeServerMessage extends CoreMessage implements LargeServerMe
buffer.writeBytes(bytes); buffer.writeBytes(bytes);
} }
@Override
public synchronized void addBytes(ActiveMQBuffer bytes) {
final int readableBytes = bytes.readableBytes();
if (buffer == null) {
buffer = Unpooled.buffer(readableBytes);
}
// expand the buffer
buffer.ensureWritable(readableBytes);
assert buffer.hasArray();
final int writerIndex = buffer.writerIndex();
bytes.readBytes(buffer.array(), buffer.arrayOffset() + writerIndex, readableBytes);
buffer.writerIndex(writerIndex + readableBytes);
}
@Override @Override
public synchronized ActiveMQBuffer getReadOnlyBodyBuffer() { public synchronized ActiveMQBuffer getReadOnlyBodyBuffer() {
return new ChannelBufferWrapper(buffer.slice(0, buffer.writerIndex()).asReadOnly()); return new ChannelBufferWrapper(buffer.slice(0, buffer.writerIndex()).asReadOnly());
} }
@Override
public synchronized int getBodyBufferSize() {
return buffer.writerIndex();
}
@Override @Override
public void deleteFile() throws Exception { public void deleteFile() throws Exception {
// nothing to be done here.. we don really have a file on this Storage // nothing to be done here.. we don really have a file on this Storage

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.server; package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
@ -28,6 +29,8 @@ public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage
@Override @Override
void addBytes(byte[] bytes) throws Exception; void addBytes(byte[] bytes) throws Exception;
void addBytes(ActiveMQBuffer bytes) throws Exception;
/** /**
* We have to copy the large message content in case of DLQ and paged messages * We have to copy the large message content in case of DLQ and paged messages
* For that we need to pre-mark the LargeMessage with a flag when it is paged * For that we need to pre-mark the LargeMessage with a flag when it is paged

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -27,8 +28,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
@ -1182,12 +1181,29 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private LargeBodyEncoder context; private LargeBodyEncoder context;
private ByteBuffer chunkBytes;
private LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception { private LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception {
largeMessage = message; largeMessage = message;
largeMessage.incrementDelayDeletionCount(); largeMessage.incrementDelayDeletionCount();
this.ref = ref; this.ref = ref;
this.chunkBytes = null;
}
private ByteBuffer acquireHeapBodyBuffer(int requiredCapacity) {
if (this.chunkBytes == null || this.chunkBytes.capacity() != requiredCapacity) {
this.chunkBytes = ByteBuffer.allocate(requiredCapacity);
} else {
this.chunkBytes.clear();
}
return this.chunkBytes;
}
private void releaseHeapBodyBuffer() {
this.chunkBytes = null;
} }
public boolean deliver() throws Exception { public boolean deliver() throws Exception {
@ -1207,7 +1223,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
availableCredits); availableCredits);
} }
releaseHeapBodyBuffer();
return false; return false;
} }
@ -1223,7 +1239,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount()); int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
if (availableCredits != null) { if (availableCredits != null) {
availableCredits.addAndGet(-packetSize); final int credits = availableCredits.addAndGet(-packetSize);
if (credits <= 0) {
releaseHeapBodyBuffer();
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(this + "::FlowControl::" + logger.trace(this + "::FlowControl::" +
@ -1246,32 +1266,38 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
availableCredits); availableCredits);
} }
releaseHeapBodyBuffer();
return false; return false;
} }
int localChunkLen = 0; final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize); final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(localChunkLen); assert bodyBuffer.remaining() == localChunkLen;
context.encode(bodyBuffer, localChunkLen); final int readBytes = context.encode(bodyBuffer);
byte[] body; assert readBytes == localChunkLen;
if (bodyBuffer.toByteBuffer().hasArray()) { final byte[] body = bodyBuffer.array();
body = bodyBuffer.toByteBuffer().array();
} else { assert body.length == readBytes;
body = new byte[0];
} //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
//given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
//resendCache != null && packet.isRequiresConfirmations()
int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false); int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
int chunkLen = body.length; int chunkLen = body.length;
if (availableCredits != null) { if (availableCredits != null) {
availableCredits.addAndGet(-packetSize); final int credits = availableCredits.addAndGet(-packetSize);
if (credits <= 0) {
releaseHeapBodyBuffer();
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
@ -1304,6 +1330,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
public void finish() throws Exception { public void finish() throws Exception {
synchronized (lock) { synchronized (lock) {
releaseHeapBodyBuffer();
if (largeMessage == null) { if (largeMessage == null) {
// handleClose could be calling close while handle is also calling finish. // handleClose could be calling close while handle is also calling finish.
// As a result one of them could get here after the largeMessage is already gone. // As a result one of them could get here after the largeMessage is already gone.

View File

@ -1398,13 +1398,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private LargeServerMessage messageToLargeMessage(Message message) throws Exception { private LargeServerMessage messageToLargeMessage(Message message) throws Exception {
ICoreMessage coreMessage = message.toCore(); ICoreMessage coreMessage = message.toCore();
LargeServerMessage lsm = getStorageManager().createLargeMessage(storageManager.generateID(), coreMessage); LargeServerMessage lsm = getStorageManager().createLargeMessage(storageManager.generateID(), coreMessage);
ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer(); ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
byte[] body = new byte[buffer.readableBytes()]; final int readableBytes = buffer.readableBytes();
buffer.readBytes(body); lsm.addBytes(buffer);
lsm.addBytes(body);
lsm.releaseResources(); lsm.releaseResources();
lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, body.length); lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
return lsm; return lsm;
} }