This commit is contained in:
Clebert Suconic 2020-12-07 21:26:50 -05:00
commit 1fc5458ebb
9 changed files with 108 additions and 67 deletions

View File

@ -45,6 +45,7 @@ import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.codec.CompositeReadableBuffer;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.TypeConstructor;
@ -102,6 +103,9 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
private StorageManager storageManager;
/** this is used to parse the initial packets from the buffer */
CompositeReadableBuffer parsingBuffer;
public AMQPLargeMessage(long id,
long messageFormat,
TypedProperties extraProperties,
@ -272,19 +276,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
}
@Override
public void finishParse() throws Exception {
openLargeMessage();
try {
this.ensureMessageDataScanned();
parsingData.rewind();
lazyDecodeApplicationProperties();
} finally {
closeLargeMessage();
}
}
@Override
public void validateFile() throws ActiveMQException {
largeBody.validateFile();
@ -344,12 +335,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
public void addBytes(ReadableBuffer data) throws Exception {
// We need to parse the header on the first add,
// as it will contain information if the message is durable or not
if (header == null && largeBody.getStoredBodySize() <= 0) {
parseHeader(data);
}
parseLargeMessage(data);
if (data.hasArray() && data.remaining() == data.array().length) {
//System.out.println("Received " + data.array().length + "::" + ByteUtil.formatGroup(ByteUtil.bytesToHex(data.array()), 8, 16));
@ -362,6 +348,63 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
}
protected void parseLargeMessage(ActiveMQBuffer data, boolean initialHeader) {
MessageDataScanningStatus status = getDataScanningStatus();
if (status == MessageDataScanningStatus.NOT_SCANNED) {
ByteBuf buffer = data.byteBuf().duplicate();
if (parsingBuffer == null) {
parsingBuffer = new CompositeReadableBuffer();
}
byte[] parsingData = new byte[buffer.readableBytes()];
buffer.readBytes(parsingData);
parsingBuffer.append(parsingData);
if (!initialHeader) {
genericParseLargeMessage();
}
}
}
protected void parseLargeMessage(byte[] data, boolean initialHeader) {
MessageDataScanningStatus status = getDataScanningStatus();
if (status == MessageDataScanningStatus.NOT_SCANNED) {
byte[] copy = new byte[data.length];
System.arraycopy(data, 0, copy, 0, data.length);
if (parsingBuffer == null) {
parsingBuffer = new CompositeReadableBuffer();
}
parsingBuffer.append(copy);
if (!initialHeader) {
genericParseLargeMessage();
}
}
}
private void genericParseLargeMessage() {
try {
parsingBuffer.position(0);
scanMessageData(parsingBuffer);
lazyDecodeApplicationProperties(parsingBuffer);
parsingBuffer = null;
} catch (RuntimeException expected) {
// this would mean the buffer is not complete yet, so we keep parsing it, until we can get enough bytes
logger.debug("The buffer for AMQP Large Mesasge was probably not complete, so an exception eventually would be expected", expected);
}
}
protected void parseLargeMessage(ReadableBuffer data) {
MessageDataScanningStatus status = getDataScanningStatus();
if (status == MessageDataScanningStatus.NOT_SCANNED) {
if (parsingBuffer == null) {
parsingBuffer = new CompositeReadableBuffer();
}
parsingBuffer.append(data.duplicate());
genericParseLargeMessage();
}
}
@Override
public ReadableBuffer getSendBuffer(int deliveryCount, MessageReference reference) {
return getData().rewind();
@ -374,11 +417,13 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
@Override
public void addBytes(byte[] bytes) throws Exception {
parseLargeMessage(bytes, false);
largeBody.addBytes(bytes);
}
@Override
public void addBytes(ActiveMQBuffer bytes) throws Exception {
public void addBytes(ActiveMQBuffer bytes, boolean initialHeader) throws Exception {
parseLargeMessage(bytes, initialHeader);
largeBody.addBytes(bytes);
}
@ -471,7 +516,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
largeBody.copyInto(copy, bufferNewHeader, place.intValue());
copy.finishParse();
copy.releaseResources(true);
return copy;

View File

@ -235,12 +235,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
this.coreMessageObjectPools = null;
}
/**
* Similarly to {@link MessageDataScanningStatus}, this method is made available only for testing
* purposes to check the message data scanning status.<br>
* Its access is not thread-safe and it shouldn't return {@code null}.
*/
public final MessageDataScanningStatus messageDataScanned() {
public final MessageDataScanningStatus getDataScanningStatus() {
return MessageDataScanningStatus.valueOf(messageDataScanned);
}
@ -432,8 +427,12 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
return scanForMessageSection(Math.max(0, remainingBodyPosition), Footer.class);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
protected <T> T scanForMessageSection(int scanStartPosition, Class...targetTypes) {
return scanForMessageSection(getData().duplicate().position(0), scanStartPosition, targetTypes);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
protected <T> T scanForMessageSection(ReadableBuffer buffer, int scanStartPosition, Class...targetTypes) {
ensureMessageDataScanned();
// In cases where we parsed out enough to know the value is not encoded in the message
@ -442,7 +441,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
return null;
}
ReadableBuffer buffer = getData().duplicate().position(0);
final DecoderImpl decoder = TLSEncode.getDecoder();
buffer.position(scanStartPosition);
@ -470,8 +468,15 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
}
protected ApplicationProperties lazyDecodeApplicationProperties() {
if (applicationProperties != null || applicationPropertiesPosition == VALUE_NOT_PRESENT) {
return applicationProperties;
}
return lazyDecodeApplicationProperties(getData().duplicate().position(0));
}
protected ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) {
if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
applicationProperties = scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class);
}
return applicationProperties;
@ -546,7 +551,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
// re-encode should be done to update the backing data with the in memory elements.
protected synchronized void ensureMessageDataScanned() {
final MessageDataScanningStatus state = MessageDataScanningStatus.valueOf(messageDataScanned);
final MessageDataScanningStatus state = getDataScanningStatus();
switch (state) {
case NOT_SCANNED:
scanMessageData();
@ -583,14 +588,15 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
}
protected synchronized void scanMessageData() {
this.messageDataScanned = MessageDataScanningStatus.SCANNED.code;
scanMessageData(getData());
}
protected synchronized void scanMessageData(ReadableBuffer data) {
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(getData().rewind());
decoder.setBuffer(data);
resetMessageData();
ReadableBuffer data = getData();
try {
while (data.hasRemaining()) {
int constructorPos = data.position();
@ -634,6 +640,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
decoder.setByteBuffer(null);
data.rewind();
}
this.messageDataScanned = MessageDataScanningStatus.SCANNED.code;
}
@Override

View File

@ -250,7 +250,6 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
if (currentLargeMessage != null) {
currentLargeMessage.addBytes(receiver.recv());
receiver.advance();
currentLargeMessage.finishParse();
message = currentLargeMessage;
currentLargeMessage = null;
} else {

View File

@ -172,20 +172,20 @@ public class AMQPMessageTest {
} catch (NullPointerException npe) {
}
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus());
// Now reload from encoded data
message.reloadPersistence(encoded, null);
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertTrue(message.hasScheduledDeliveryTime());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
message.getHeader();
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
assertTrue(message.hasScheduledDeliveryTime());
}
@ -205,20 +205,20 @@ public class AMQPMessageTest {
} catch (NullPointerException npe) {
}
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus());
// Now reload from encoded data
message.reloadPersistence(encoded, null);
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertTrue(message.hasScheduledDeliveryTime());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
message.getHeader();
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
assertTrue(message.hasScheduledDeliveryTime());
}
@ -235,20 +235,20 @@ public class AMQPMessageTest {
} catch (NullPointerException npe) {
}
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus());
// Now reload from encoded data
message.reloadPersistence(encoded, null);
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertFalse(message.hasScheduledDeliveryTime());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
message.getHeader();
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
assertFalse(message.hasScheduledDeliveryTime());
}

View File

@ -336,7 +336,7 @@ public class LargeBody {
cloneFile.position(skipBytes);
if (newHeader != null) {
newMessage.addBytes(new ChannelBufferWrapper(newHeader));
newMessage.addBytes(new ChannelBufferWrapper(newHeader), true);
}
for (; ; ) {
// The buffer is reused...

View File

@ -57,11 +57,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
}
}
@Override
public void finishParse() throws Exception {
}
private static Message asLargeMessage(Message message, StorageManager storageManager) throws Exception {
ICoreMessage coreMessage = message.toCore();
LargeServerMessage lsm = storageManager.createLargeMessage(storageManager.generateID(), coreMessage);
@ -176,7 +171,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
}
@Override
public void addBytes(final ActiveMQBuffer bytes) throws Exception {
public void addBytes(final ActiveMQBuffer bytes, boolean initialHeader) throws Exception {
synchronized (largeBody) {
largeBody.addBytes(bytes);
}
@ -298,7 +293,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
try {
LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
largeBody.copyInto(newMessage);
newMessage.finishParse();
newMessage.releaseResources(true);
return newMessage.toMessage();

View File

@ -66,11 +66,6 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ
buffer.writeBytes(bytes);
}
@Override
public void finishParse() throws Exception {
}
@Override
public void validateFile() throws ActiveMQException {
@ -82,7 +77,7 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ
}
@Override
public synchronized void addBytes(ActiveMQBuffer bytes) {
public synchronized void addBytes(ActiveMQBuffer bytes, boolean initialHeader) {
final int readableBytes = bytes.readableBytes();
if (buffer == null) {
buffer = Unpooled.buffer(readableBytes);

View File

@ -34,7 +34,11 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
@Override
void addBytes(byte[] bytes) throws Exception;
void addBytes(ActiveMQBuffer bytes) throws Exception;
default void addBytes(ActiveMQBuffer bytes) throws Exception {
addBytes(bytes, false);
}
void addBytes(ActiveMQBuffer bytes, boolean initialHeader) throws Exception;
long getMessageID();
@ -67,6 +71,4 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
void setStorageManager(StorageManager storageManager);
void validateFile() throws ActiveMQException;
void finishParse() throws Exception;
}

View File

@ -59,7 +59,7 @@ public class AmqpJournalLoadingTest extends AmqpClientTestSupport {
final Message message = next.getMessage();
Assert.assertThat(message, Matchers.instanceOf(AMQPMessage.class));
amqpMessage = (AMQPMessage) message;
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, amqpMessage.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, amqpMessage.getDataScanningStatus());
}
AmqpClient client = createAmqpClient();
@ -75,7 +75,7 @@ public class AmqpJournalLoadingTest extends AmqpClientTestSupport {
assertEquals(1, afterRestartQueueView.getMessageCount());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, amqpMessage.messageDataScanned());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, amqpMessage.getDataScanningStatus());
receive.accept();