Fix for losing message content when reusing messages.
This commit is contained in:
Timothy Bish 2013-12-13 10:54:58 -05:00
parent dcedd9fe96
commit cb5c29d02d
2 changed files with 94 additions and 1 deletions

View File

@ -856,6 +856,42 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
} }
this.dataOut = new DataOutputStream(os); this.dataOut = new DataOutputStream(os);
} }
restoreOldContent();
}
private void restoreOldContent() throws JMSException {
// For a message that already had a body and was sent we need to restore the content
// if the message is used again without having its clearBody method called.
if (this.content != null && this.content.length > 0) {
try {
ByteSequence toRestore = this.content;
if (compressed) {
InputStream is = new ByteArrayInputStream(toRestore);
int length = 0;
try {
DataInputStream dis = new DataInputStream(is);
length = dis.readInt();
dis.close();
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
is = new InflaterInputStream(is);
DataInputStream input = new DataInputStream(is);
byte[] buffer = new byte[length];
input.readFully(buffer);
toRestore = new ByteSequence(buffer);
}
this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength());
// Free up the buffer from the old content, will be re-written when
// the message is sent again and storeContent() is called.
this.content = null;
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
}
} }
protected void checkWriteOnlyBody() throws MessageNotReadableException { protected void checkWriteOnlyBody() throws MessageNotReadableException {

View File

@ -118,6 +118,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
protected transient DataInputStream dataIn; protected transient DataInputStream dataIn;
protected transient int remainingBytes = -1; protected transient int remainingBytes = -1;
@Override
public Message copy() { public Message copy() {
ActiveMQStreamMessage copy = new ActiveMQStreamMessage(); ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
copy(copy); copy(copy);
@ -132,6 +133,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
copy.dataIn = null; copy.dataIn = null;
} }
@Override
public void onSend() throws JMSException { public void onSend() throws JMSException {
super.onSend(); super.onSend();
storeContent(); storeContent();
@ -151,10 +153,12 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
} }
} }
@Override
public byte getDataStructureType() { public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE; return DATA_STRUCTURE_TYPE;
} }
@Override
public String getJMSXMimeType() { public String getJMSXMimeType() {
return "jms/stream-message"; return "jms/stream-message";
} }
@ -171,6 +175,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* due to some internal error. * due to some internal error.
*/ */
@Override
public void clearBody() throws JMSException { public void clearBody() throws JMSException {
super.clearBody(); super.clearBody();
this.dataOut = null; this.dataOut = null;
@ -191,6 +196,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public boolean readBoolean() throws JMSException { public boolean readBoolean() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -233,6 +239,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public byte readByte() throws JMSException { public byte readByte() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -282,6 +289,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public short readShort() throws JMSException { public short readShort() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -335,6 +343,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public char readChar() throws JMSException { public char readChar() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -382,6 +391,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public int readInt() throws JMSException { public int readInt() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -438,6 +448,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public long readLong() throws JMSException { public long readLong() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -496,6 +507,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public float readFloat() throws JMSException { public float readFloat() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -544,6 +556,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public double readDouble() throws JMSException { public double readDouble() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -596,6 +609,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public String readString() throws JMSException { public String readString() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -696,6 +710,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @see #readObject() * @see #readObject()
*/ */
@Override
public int readBytes(byte[] value) throws JMSException { public int readBytes(byte[] value) throws JMSException {
initializeReading(); initializeReading();
@ -769,6 +784,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @see #readBytes(byte[] value) * @see #readBytes(byte[] value)
*/ */
@Override
public Object readObject() throws JMSException { public Object readObject() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -849,6 +865,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeBoolean(boolean value) throws JMSException { public void writeBoolean(boolean value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -867,6 +884,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeByte(byte value) throws JMSException { public void writeByte(byte value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -885,6 +903,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeShort(short value) throws JMSException { public void writeShort(short value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -903,6 +922,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeChar(char value) throws JMSException { public void writeChar(char value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -921,6 +941,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeInt(int value) throws JMSException { public void writeInt(int value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -939,6 +960,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeLong(long value) throws JMSException { public void writeLong(long value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -957,6 +979,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeFloat(float value) throws JMSException { public void writeFloat(float value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -975,6 +998,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeDouble(double value) throws JMSException { public void writeDouble(double value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -993,6 +1017,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeString(String value) throws JMSException { public void writeString(String value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -1019,6 +1044,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeBytes(byte[] value) throws JMSException { public void writeBytes(byte[] value) throws JMSException {
writeBytes(value, 0, value.length); writeBytes(value, 0, value.length);
} }
@ -1039,6 +1065,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeBytes(byte[] value, int offset, int length) throws JMSException { public void writeBytes(byte[] value, int offset, int length) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -1062,6 +1089,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeObject(Object value) throws JMSException { public void writeObject(Object value) throws JMSException {
initializeWriting(); initializeWriting();
if (value == null) { if (value == null) {
@ -1102,6 +1130,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
* @throws JMSException if an internal error occurs * @throws JMSException if an internal error occurs
*/ */
@Override
public void reset() throws JMSException { public void reset() throws JMSException {
storeContent(); storeContent();
this.bytesOut = null; this.bytesOut = null;
@ -1111,7 +1140,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
setReadOnlyBody(true); setReadOnlyBody(true);
} }
private void initializeWriting() throws MessageNotWriteableException { private void initializeWriting() throws JMSException {
checkReadOnlyBody(); checkReadOnlyBody();
if (this.dataOut == null) { if (this.dataOut == null) {
this.bytesOut = new ByteArrayOutputStream(); this.bytesOut = new ByteArrayOutputStream();
@ -1123,6 +1152,33 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
} }
this.dataOut = new DataOutputStream(os); this.dataOut = new DataOutputStream(os);
} }
// For a message that already had a body and was sent we need to restore the content
// if the message is used again without having its clearBody method called.
if (this.content != null && this.content.length > 0) {
try {
if (compressed) {
ByteArrayInputStream input = new ByteArrayInputStream(this.content.getData(), this.content.getOffset(), this.content.getLength());
InflaterInputStream inflater = new InflaterInputStream(input);
try {
byte[] buffer = new byte[8*1024];
int read = 0;
while ((read = inflater.read(buffer)) != -1) {
this.dataOut.write(buffer, 0, read);
}
} finally {
inflater.close();
}
} else {
this.dataOut.write(this.content.getData(), this.content.getOffset(), this.content.getLength());
}
// Free up the buffer from the old content, will be re-written when
// tbe message is sent again and storeContent() is called.
this.content = null;
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
}
} }
protected void checkWriteOnlyBody() throws MessageNotReadableException { protected void checkWriteOnlyBody() throws MessageNotReadableException {
@ -1153,6 +1209,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
super.compress(); super.compress();
} }
@Override
public String toString() { public String toString() {
return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }"; return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
} }