Add a storeConentAndClear() method to message so that we can lower the memory impact of embedded broker usage.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1481527 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-05-12 11:23:30 +00:00
parent b820ac2bf9
commit 30038957b6
6 changed files with 41 additions and 3 deletions

View File

@ -135,6 +135,12 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
map.clear();
}
@Override
public void storeContentAndClear() {
storeContent();
map.clear();
}
@Override
public void storeContent() {
try {

View File

@ -752,4 +752,9 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
@Override
public void storeContent() {
}
@Override
public void storeContentAndClear() {
storeContent();
}
}

View File

@ -89,6 +89,12 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess
}
@Override
public void storeContentAndClear() {
storeContent();
object = null;
}
@Override
public void storeContent() {
ByteSequence bodyAsBytes = getContent();

View File

@ -74,6 +74,16 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
public String getText() throws JMSException {
if (text == null && getContent() != null) {
text = decodeContent();
setContent(null);
setCompressed(false);
}
return text;
}
private String decodeContent() throws JMSException {
String text = null;
if (getContent() != null) {
InputStream is = null;
try {
ByteSequence bodyAsBytes = getContent();
@ -85,8 +95,6 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
DataInputStream dataIn = new DataInputStream(is);
text = MarshallingSupport.readUTF8(dataIn);
dataIn.close();
setContent(null);
setCompressed(false);
}
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
@ -108,6 +116,12 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
storeContent();
}
@Override
public void storeContentAndClear() {
storeContent();
text=null;
}
@Override
public void storeContent() {
try {
@ -166,7 +180,10 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
public String toString() {
try {
String text = getText();
String text = this.text;
if( text == null ) {
text = decodeContent();
}
if (text != null) {
text = MarshallingSupport.truncate64(text);
HashMap<String, Object> overrideFields = new HashMap<String, Object>();

View File

@ -104,6 +104,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
public abstract Message copy();
public abstract void clearBody() throws JMSException;
public abstract void storeContent();
public abstract void storeContentAndClear();
// useful to reduce the memory footprint of a persisted message
public void clearMarshalledState() throws JMSException {

View File

@ -257,6 +257,9 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
val messageRecord = id.getDataLocator match {
case null =>
// encodes body and release object bodies, in case message was sent from
// a VM connection. Releases additional memory.
message.storeContentAndClear()
var packet = manager.parent.wireFormat.marshal(message)
var data = new Buffer(packet.data, packet.offset, packet.length)
if( manager.snappyCompressLogs ) {