From cb5c29d02d02dc7f7fa4f5c1a97bd2a59078bccd Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 13 Dec 2013 10:54:58 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4887 Fix for losing message content when reusing messages. --- .../command/ActiveMQBytesMessage.java | 36 +++++++++++ .../command/ActiveMQStreamMessage.java | 59 ++++++++++++++++++- 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java index 6de35aa78f..923e0e1095 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java @@ -856,6 +856,42 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag } this.dataOut = new DataOutputStream(os); } + + restoreOldContent(); + } + + private void restoreOldContent() throws JMSException { + // For a message that already had a body and was sent we need to restore the content + // if the message is used again without having its clearBody method called. + if (this.content != null && this.content.length > 0) { + try { + ByteSequence toRestore = this.content; + if (compressed) { + InputStream is = new ByteArrayInputStream(toRestore); + int length = 0; + try { + DataInputStream dis = new DataInputStream(is); + length = dis.readInt(); + dis.close(); + } catch (IOException e) { + throw JMSExceptionSupport.create(e); + } + is = new InflaterInputStream(is); + DataInputStream input = new DataInputStream(is); + + byte[] buffer = new byte[length]; + input.readFully(buffer); + toRestore = new ByteSequence(buffer); + } + + this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength()); + // Free up the buffer from the old content, will be re-written when + // the message is sent again and storeContent() is called. + this.content = null; + } catch (IOException ioe) { + throw JMSExceptionSupport.create(ioe); + } + } } protected void checkWriteOnlyBody() throws MessageNotReadableException { diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java index f9dda6c3d1..f6e927ae3f 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java @@ -118,6 +118,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess protected transient DataInputStream dataIn; protected transient int remainingBytes = -1; + @Override public Message copy() { ActiveMQStreamMessage copy = new ActiveMQStreamMessage(); copy(copy); @@ -132,6 +133,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess copy.dataIn = null; } + @Override public void onSend() throws JMSException { super.onSend(); storeContent(); @@ -151,10 +153,12 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess } } + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; } + @Override public String getJMSXMimeType() { return "jms/stream-message"; } @@ -171,6 +175,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * due to some internal error. */ + @Override public void clearBody() throws JMSException { super.clearBody(); this.dataOut = null; @@ -191,6 +196,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public boolean readBoolean() throws JMSException { initializeReading(); try { @@ -233,6 +239,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public byte readByte() throws JMSException { initializeReading(); try { @@ -282,6 +289,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public short readShort() throws JMSException { initializeReading(); try { @@ -335,6 +343,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public char readChar() throws JMSException { initializeReading(); try { @@ -382,6 +391,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public int readInt() throws JMSException { initializeReading(); try { @@ -438,6 +448,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public long readLong() throws JMSException { initializeReading(); try { @@ -496,6 +507,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public float readFloat() throws JMSException { initializeReading(); try { @@ -544,6 +556,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public double readDouble() throws JMSException { initializeReading(); try { @@ -596,6 +609,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public String readString() throws JMSException { initializeReading(); try { @@ -696,6 +710,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @see #readObject() */ + @Override public int readBytes(byte[] value) throws JMSException { initializeReading(); @@ -769,6 +784,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @see #readBytes(byte[] value) */ + @Override public Object readObject() throws JMSException { initializeReading(); try { @@ -849,6 +865,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeBoolean(boolean value) throws JMSException { initializeWriting(); try { @@ -867,6 +884,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeByte(byte value) throws JMSException { initializeWriting(); try { @@ -885,6 +903,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeShort(short value) throws JMSException { initializeWriting(); try { @@ -903,6 +922,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeChar(char value) throws JMSException { initializeWriting(); try { @@ -921,6 +941,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeInt(int value) throws JMSException { initializeWriting(); try { @@ -939,6 +960,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeLong(long value) throws JMSException { initializeWriting(); try { @@ -957,6 +979,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeFloat(float value) throws JMSException { initializeWriting(); try { @@ -975,6 +998,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeDouble(double value) throws JMSException { initializeWriting(); try { @@ -993,6 +1017,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeString(String value) throws JMSException { initializeWriting(); try { @@ -1019,6 +1044,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeBytes(byte[] value) throws JMSException { writeBytes(value, 0, value.length); } @@ -1039,6 +1065,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeBytes(byte[] value, int offset, int length) throws JMSException { initializeWriting(); try { @@ -1062,6 +1089,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeObject(Object value) throws JMSException { initializeWriting(); if (value == null) { @@ -1102,6 +1130,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess * @throws JMSException if an internal error occurs */ + @Override public void reset() throws JMSException { storeContent(); this.bytesOut = null; @@ -1111,7 +1140,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess setReadOnlyBody(true); } - private void initializeWriting() throws MessageNotWriteableException { + private void initializeWriting() throws JMSException { checkReadOnlyBody(); if (this.dataOut == null) { this.bytesOut = new ByteArrayOutputStream(); @@ -1123,6 +1152,33 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess } this.dataOut = new DataOutputStream(os); } + + // For a message that already had a body and was sent we need to restore the content + // if the message is used again without having its clearBody method called. + if (this.content != null && this.content.length > 0) { + try { + if (compressed) { + ByteArrayInputStream input = new ByteArrayInputStream(this.content.getData(), this.content.getOffset(), this.content.getLength()); + InflaterInputStream inflater = new InflaterInputStream(input); + try { + byte[] buffer = new byte[8*1024]; + int read = 0; + while ((read = inflater.read(buffer)) != -1) { + this.dataOut.write(buffer, 0, read); + } + } finally { + inflater.close(); + } + } else { + this.dataOut.write(this.content.getData(), this.content.getOffset(), this.content.getLength()); + } + // Free up the buffer from the old content, will be re-written when + // tbe message is sent again and storeContent() is called. + this.content = null; + } catch (IOException ioe) { + throw JMSExceptionSupport.create(ioe); + } + } } protected void checkWriteOnlyBody() throws MessageNotReadableException { @@ -1153,6 +1209,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess super.compress(); } + @Override public String toString() { return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }"; }