From c899492af8205ac007a9295608d3af77615eb75c Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Mon, 26 Nov 2012 16:16:44 +0000 Subject: [PATCH] some fixes for: https://issues.apache.org/jira/browse/AMQ-4182 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1413703 13f79535-47bb-0310-9956-ffa450edef68 --- .../command/ActiveMQBytesMessage.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java index bda656cdb4..6de35aa78f 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java @@ -29,8 +29,10 @@ import java.util.zip.InflaterInputStream; import javax.jms.BytesMessage; import javax.jms.JMSException; +import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.util.ByteArrayInputStream; @@ -99,6 +101,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag protected transient DataInputStream dataIn; protected transient int length; + @Override public Message copy() { ActiveMQBytesMessage copy = new ActiveMQBytesMessage(); copy(copy); @@ -113,6 +116,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag copy.dataIn = null; } + @Override public void onSend() throws JMSException { super.onSend(); storeContent(); @@ -139,10 +143,12 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag } } + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; } + @Override public String getJMSXMimeType() { return "jms/bytes-message"; } @@ -158,6 +164,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * @throws JMSException if the JMS provider fails to clear the message body * due to some internal error. */ + @Override public void clearBody() throws JMSException { super.clearBody(); this.dataOut = null; @@ -178,6 +185,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * @since 1.1 */ + @Override public long getBodyLength() throws JMSException { initializeReading(); return length; @@ -193,6 +201,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * reached. * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public boolean readBoolean() throws JMSException { initializeReading(); try { @@ -215,6 +224,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * reached. * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public byte readByte() throws JMSException { initializeReading(); try { @@ -237,6 +247,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * reached. * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public int readUnsignedByte() throws JMSException { initializeReading(); try { @@ -259,6 +270,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * reached. * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public short readShort() throws JMSException { initializeReading(); try { @@ -281,6 +293,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * reached. * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public int readUnsignedShort() throws JMSException { initializeReading(); try { @@ -303,6 +316,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * reached. * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public char readChar() throws JMSException { initializeReading(); try { @@ -325,6 +339,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * reached. * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public int readInt() throws JMSException { initializeReading(); try { @@ -347,6 +362,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * reached. * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public long readLong() throws JMSException { initializeReading(); try { @@ -369,6 +385,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * reached. * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public float readFloat() throws JMSException { initializeReading(); try { @@ -391,6 +408,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * reached. * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public double readDouble() throws JMSException { initializeReading(); try { @@ -418,6 +436,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * reached. * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public String readUTF() throws JMSException { initializeReading(); try { @@ -449,6 +468,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * some internal error. * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public int readBytes(byte[] value) throws JMSException { return readBytes(value, value.length); } @@ -479,6 +499,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * some internal error. * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public int readBytes(byte[] value, int length) throws JMSException { initializeReading(); try { @@ -512,6 +533,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * to some internal error. * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeBoolean(boolean value) throws JMSException { initializeWriting(); try { @@ -530,6 +552,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * to some internal error. * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeByte(byte value) throws JMSException { initializeWriting(); try { @@ -548,6 +571,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * to some internal error. * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeShort(short value) throws JMSException { initializeWriting(); try { @@ -566,6 +590,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * to some internal error. * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeChar(char value) throws JMSException { initializeWriting(); try { @@ -584,6 +609,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * to some internal error. * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeInt(int value) throws JMSException { initializeWriting(); try { @@ -602,6 +628,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * to some internal error. * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeLong(long value) throws JMSException { initializeWriting(); try { @@ -622,6 +649,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * to some internal error. * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeFloat(float value) throws JMSException { initializeWriting(); try { @@ -642,6 +670,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * to some internal error. * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeDouble(double value) throws JMSException { initializeWriting(); try { @@ -665,6 +694,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * to some internal error. * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeUTF(String value) throws JMSException { initializeWriting(); try { @@ -682,6 +712,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * to some internal error. * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeBytes(byte[] value) throws JMSException { initializeWriting(); try { @@ -701,6 +732,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * to some internal error. * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeBytes(byte[] value, int offset, int length) throws JMSException { initializeWriting(); try { @@ -726,6 +758,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * @throws java.lang.NullPointerException if the parameter * value is null. */ + @Override public void writeObject(Object value) throws JMSException { if (value == null) { throw new NullPointerException(); @@ -762,9 +795,17 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag * * @throws JMSException if an internal error occurs */ + @Override public void reset() throws JMSException { storeContent(); this.bytesOut = null; + if (dataIn != null) { + try { + // Eagerly release potential Inflater memory buffers. + dataIn.close(); + } catch (Exception e) { + } + } this.dataIn = null; this.dataOut = null; setReadOnlyBody(true); @@ -788,16 +829,19 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag compressed = true; final Deflater deflater = new Deflater(Deflater.BEST_SPEED); os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) { + @Override public void write(byte[] arg0) throws IOException { length += arg0.length; out.write(arg0); } + @Override public void write(byte[] arg0, int arg1, int arg2) throws IOException { length += arg2; out.write(arg0, arg1, arg2); } + @Override public void write(int arg0) throws IOException { length++; out.write(arg0); @@ -846,11 +890,13 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag } } + @Override public void setObjectProperty(String name, Object value) throws JMSException { initializeWriting(); super.setObjectProperty(name, value); } + @Override public String toString() { return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }"; } @@ -872,4 +918,16 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag bytes.offset = 0; setContent(bytes); } + + @Override + protected void finalize() throws Throwable { + // Attempt to do eager close in case of compressed data which uses a + // wrapped InflaterInputStream. + if (dataIn != null) { + try { + dataIn.close(); + } catch(Exception e) { + } + } + } }