git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1413703 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-11-26 16:16:44 +00:00
parent f42d6bb1fb
commit c899492af8

View File

@ -29,8 +29,10 @@ import java.util.zip.InflaterInputStream;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException; import javax.jms.MessageFormatException;
import javax.jms.MessageNotReadableException; import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.ByteArrayInputStream; import org.apache.activemq.util.ByteArrayInputStream;
@ -99,6 +101,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
protected transient DataInputStream dataIn; protected transient DataInputStream dataIn;
protected transient int length; protected transient int length;
@Override
public Message copy() { public Message copy() {
ActiveMQBytesMessage copy = new ActiveMQBytesMessage(); ActiveMQBytesMessage copy = new ActiveMQBytesMessage();
copy(copy); copy(copy);
@ -113,6 +116,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
copy.dataIn = null; copy.dataIn = null;
} }
@Override
public void onSend() throws JMSException { public void onSend() throws JMSException {
super.onSend(); super.onSend();
storeContent(); storeContent();
@ -139,10 +143,12 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
} }
} }
@Override
public byte getDataStructureType() { public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE; return DATA_STRUCTURE_TYPE;
} }
@Override
public String getJMSXMimeType() { public String getJMSXMimeType() {
return "jms/bytes-message"; return "jms/bytes-message";
} }
@ -158,6 +164,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* @throws JMSException if the JMS provider fails to clear the message body * @throws JMSException if the JMS provider fails to clear the message body
* due to some internal error. * due to some internal error.
*/ */
@Override
public void clearBody() throws JMSException { public void clearBody() throws JMSException {
super.clearBody(); super.clearBody();
this.dataOut = null; this.dataOut = null;
@ -178,6 +185,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* @since 1.1 * @since 1.1
*/ */
@Override
public long getBodyLength() throws JMSException { public long getBodyLength() throws JMSException {
initializeReading(); initializeReading();
return length; return length;
@ -193,6 +201,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* reached. * reached.
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public boolean readBoolean() throws JMSException { public boolean readBoolean() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -215,6 +224,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* reached. * reached.
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public byte readByte() throws JMSException { public byte readByte() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -237,6 +247,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* reached. * reached.
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public int readUnsignedByte() throws JMSException { public int readUnsignedByte() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -259,6 +270,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* reached. * reached.
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public short readShort() throws JMSException { public short readShort() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -281,6 +293,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* reached. * reached.
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public int readUnsignedShort() throws JMSException { public int readUnsignedShort() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -303,6 +316,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* reached. * reached.
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public char readChar() throws JMSException { public char readChar() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -325,6 +339,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* reached. * reached.
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public int readInt() throws JMSException { public int readInt() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -347,6 +362,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* reached. * reached.
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public long readLong() throws JMSException { public long readLong() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -369,6 +385,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* reached. * reached.
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public float readFloat() throws JMSException { public float readFloat() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -391,6 +408,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* reached. * reached.
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public double readDouble() throws JMSException { public double readDouble() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -418,6 +436,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* reached. * reached.
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public String readUTF() throws JMSException { public String readUTF() throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -449,6 +468,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* some internal error. * some internal error.
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public int readBytes(byte[] value) throws JMSException { public int readBytes(byte[] value) throws JMSException {
return readBytes(value, value.length); return readBytes(value, value.length);
} }
@ -479,6 +499,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* some internal error. * some internal error.
* @throws MessageNotReadableException if the message is in write-only mode. * @throws MessageNotReadableException if the message is in write-only mode.
*/ */
@Override
public int readBytes(byte[] value, int length) throws JMSException { public int readBytes(byte[] value, int length) throws JMSException {
initializeReading(); initializeReading();
try { try {
@ -512,6 +533,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* to some internal error. * to some internal error.
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeBoolean(boolean value) throws JMSException { public void writeBoolean(boolean value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -530,6 +552,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* to some internal error. * to some internal error.
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeByte(byte value) throws JMSException { public void writeByte(byte value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -548,6 +571,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* to some internal error. * to some internal error.
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeShort(short value) throws JMSException { public void writeShort(short value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -566,6 +590,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* to some internal error. * to some internal error.
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeChar(char value) throws JMSException { public void writeChar(char value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -584,6 +609,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* to some internal error. * to some internal error.
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeInt(int value) throws JMSException { public void writeInt(int value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -602,6 +628,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* to some internal error. * to some internal error.
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeLong(long value) throws JMSException { public void writeLong(long value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -622,6 +649,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* to some internal error. * to some internal error.
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeFloat(float value) throws JMSException { public void writeFloat(float value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -642,6 +670,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* to some internal error. * to some internal error.
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeDouble(double value) throws JMSException { public void writeDouble(double value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -665,6 +694,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* to some internal error. * to some internal error.
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeUTF(String value) throws JMSException { public void writeUTF(String value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -682,6 +712,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* to some internal error. * to some internal error.
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeBytes(byte[] value) throws JMSException { public void writeBytes(byte[] value) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -701,6 +732,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* to some internal error. * to some internal error.
* @throws MessageNotWriteableException if the message is in read-only mode. * @throws MessageNotWriteableException if the message is in read-only mode.
*/ */
@Override
public void writeBytes(byte[] value, int offset, int length) throws JMSException { public void writeBytes(byte[] value, int offset, int length) throws JMSException {
initializeWriting(); initializeWriting();
try { try {
@ -726,6 +758,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* @throws java.lang.NullPointerException if the parameter * @throws java.lang.NullPointerException if the parameter
* <code>value</code> is null. * <code>value</code> is null.
*/ */
@Override
public void writeObject(Object value) throws JMSException { public void writeObject(Object value) throws JMSException {
if (value == null) { if (value == null) {
throw new NullPointerException(); throw new NullPointerException();
@ -762,9 +795,17 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
* *
* @throws JMSException if an internal error occurs * @throws JMSException if an internal error occurs
*/ */
@Override
public void reset() throws JMSException { public void reset() throws JMSException {
storeContent(); storeContent();
this.bytesOut = null; this.bytesOut = null;
if (dataIn != null) {
try {
// Eagerly release potential Inflater memory buffers.
dataIn.close();
} catch (Exception e) {
}
}
this.dataIn = null; this.dataIn = null;
this.dataOut = null; this.dataOut = null;
setReadOnlyBody(true); setReadOnlyBody(true);
@ -788,16 +829,19 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
compressed = true; compressed = true;
final Deflater deflater = new Deflater(Deflater.BEST_SPEED); final Deflater deflater = new Deflater(Deflater.BEST_SPEED);
os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) { os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) {
@Override
public void write(byte[] arg0) throws IOException { public void write(byte[] arg0) throws IOException {
length += arg0.length; length += arg0.length;
out.write(arg0); out.write(arg0);
} }
@Override
public void write(byte[] arg0, int arg1, int arg2) throws IOException { public void write(byte[] arg0, int arg1, int arg2) throws IOException {
length += arg2; length += arg2;
out.write(arg0, arg1, arg2); out.write(arg0, arg1, arg2);
} }
@Override
public void write(int arg0) throws IOException { public void write(int arg0) throws IOException {
length++; length++;
out.write(arg0); out.write(arg0);
@ -846,11 +890,13 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
} }
} }
@Override
public void setObjectProperty(String name, Object value) throws JMSException { public void setObjectProperty(String name, Object value) throws JMSException {
initializeWriting(); initializeWriting();
super.setObjectProperty(name, value); super.setObjectProperty(name, value);
} }
@Override
public String toString() { public String toString() {
return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }"; return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
} }
@ -872,4 +918,16 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
bytes.offset = 0; bytes.offset = 0;
setContent(bytes); setContent(bytes);
} }
@Override
protected void finalize() throws Throwable {
// Attempt to do eager close in case of compressed data which uses a
// wrapped InflaterInputStream.
if (dataIn != null) {
try {
dataIn.close();
} catch(Exception e) {
}
}
}
} }