https://issues.apache.org/jira/browse/AMQ-4182 - explicitly comporess/decompress byte messages, so we can avoid using finalize() to close streams

This commit is contained in:
Dejan Bosanac 2014-04-23 12:45:40 +02:00
parent fad1dd0f17
commit 44bb9fbeae
1 changed files with 90 additions and 105 deletions

View File

@ -23,8 +23,10 @@ import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import javax.jms.BytesMessage;
@ -124,22 +126,29 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
@Override
public void storeContent() {
try {
if (dataOut != null) {
if (dataOut != null) {
try {
dataOut.close();
ByteSequence bs = bytesOut.toByteSequence();
if (compressed) {
int pos = bs.offset;
ByteSequenceData.writeIntBig(bs, length);
bs.offset = pos;
}
setContent(bs);
bytesOut = null;
dataOut = null;
if (compressed) {
doCompress();
}
} catch (IOException ioe) {
throw new RuntimeException(ioe.getMessage(), ioe);
} finally {
try {
if (bytesOut != null) {
bytesOut.close();
bytesOut = null;
}
if (dataOut != null) {
dataOut.close();
dataOut = null;
}
} catch (IOException ioe) {
}
}
} catch (IOException ioe) {
throw new RuntimeException(ioe.getMessage(), ioe); // TODO verify
// RuntimeException
}
}
@ -798,17 +807,23 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
@Override
public void reset() throws JMSException {
storeContent();
this.bytesOut = null;
if (dataIn != null) {
try {
// Eagerly release potential Inflater memory buffers.
dataIn.close();
} catch (Exception e) {
}
}
this.dataIn = null;
this.dataOut = null;
setReadOnlyBody(true);
try {
if (bytesOut != null) {
bytesOut.close();
bytesOut = null;
}
if (dataIn != null) {
dataIn.close();
dataIn = null;
}
if (dataOut != null) {
dataOut.close();
dataOut = null;
}
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
}
private void initializeWriting() throws JMSException {
@ -816,47 +831,14 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
if (this.dataOut == null) {
this.bytesOut = new ByteArrayOutputStream();
OutputStream os = bytesOut;
ActiveMQConnection connection = getConnection();
if (connection != null && connection.isUseCompression()) {
// keep track of the real length of the content if
// we are compressed.
try {
os.write(new byte[4]);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
length = 0;
compressed = true;
final Deflater deflater = new Deflater(Deflater.BEST_SPEED);
os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) {
@Override
public void write(byte[] arg0) throws IOException {
length += arg0.length;
out.write(arg0);
}
@Override
public void write(byte[] arg0, int arg1, int arg2) throws IOException {
length += arg2;
out.write(arg0, arg1, arg2);
}
@Override
public void write(int arg0) throws IOException {
length++;
out.write(arg0);
}
@Override
public void close() throws IOException {
super.close();
deflater.end();
}
};
}
this.dataOut = new DataOutputStream(os);
}
ActiveMQConnection connection = getConnection();
if (connection != null && connection.isUseCompression()) {
compressed = true;
}
restoreOldContent();
}
@ -867,21 +849,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
try {
ByteSequence toRestore = this.content;
if (compressed) {
InputStream is = new ByteArrayInputStream(toRestore);
int length = 0;
try {
DataInputStream dis = new DataInputStream(is);
length = dis.readInt();
dis.close();
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
is = new InflaterInputStream(is);
DataInputStream input = new DataInputStream(is);
byte[] buffer = new byte[length];
input.readFully(buffer);
toRestore = new ByteSequence(buffer);
toRestore = new ByteSequence(decompress(this.content));
}
this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength());
@ -903,26 +871,44 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
private void initializeReading() throws JMSException {
checkWriteOnlyBody();
if (dataIn == null) {
try {
ByteSequence data = getContent();
if (data == null) {
data = new ByteSequence(new byte[] {}, 0, 0);
}
InputStream is = new ByteArrayInputStream(data);
if (isCompressed()) {
// keep track of the real length of the content if
// we are compressed.
try {
DataInputStream dis = new DataInputStream(is);
length = dis.readInt();
dis.close();
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
if (data.length != 0) {
is = new ByteArrayInputStream(decompress(data));
}
is = new InflaterInputStream(is);
} else {
length = data.getLength();
}
dataIn = new DataInputStream(is);
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
}
}
protected byte[] decompress(ByteSequence dataSequence) throws IOException {
Inflater inflater = new Inflater();
ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
try {
length = ByteSequenceData.readIntBig(dataSequence);
dataSequence.offset = 0;
byte[] data = Arrays.copyOfRange(dataSequence.getData(), 4, dataSequence.getLength());
inflater.setInput(data);
byte[] buffer = new byte[length];
int count = inflater.inflate(buffer);
decompressed.write(buffer, 0, count);
return decompressed.toByteArray();
} catch (Exception e) {
throw new IOException(e);
} finally {
inflater.end();
decompressed.close();
}
}
@ -941,28 +927,27 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
protected void doCompress() throws IOException {
compressed = true;
ByteSequence bytes = getContent();
int length = bytes.getLength();
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
bytesOut.write(new byte[4]);
DeflaterOutputStream os = new DeflaterOutputStream(bytesOut);
DataOutputStream dataOut = new DataOutputStream(os);
dataOut.write(bytes.data, bytes.offset, bytes.length);
dataOut.flush();
dataOut.close();
bytes = bytesOut.toByteSequence();
ByteSequenceData.writeIntBig(bytes, length);
bytes.offset = 0;
setContent(bytes);
}
@Override
protected void finalize() throws Throwable {
// Attempt to do eager close in case of compressed data which uses a
// wrapped InflaterInputStream.
if (dataIn != null) {
if (bytes != null) {
int length = bytes.getLength();
ByteArrayOutputStream compressed = new ByteArrayOutputStream();
compressed.write(new byte[4]);
Deflater deflater = new Deflater();
try {
dataIn.close();
} catch(Exception e) {
deflater.setInput(bytes.data);
deflater.finish();
byte[] buffer = new byte[1024];
while (!deflater.finished()) {
int count = deflater.deflate(buffer);
compressed.write(buffer, 0, count);
}
bytes = compressed.toByteSequence();
ByteSequenceData.writeIntBig(bytes, length);
bytes.offset = 0;
setContent(bytes);
} finally {
deflater.end();
compressed.close();
}
}
}