Synchronizing ActiveMQText message on state changes for the content and
text fields so that they are always changed together.  This will prevent
race conditions where data can be lost when using concurrent store and
dispatch.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-03-22 13:41:20 +00:00
parent ea09159a40
commit e0c5499964
1 changed files with 71 additions and 28 deletions

View File

@ -45,7 +45,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE; public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE;
protected String text; protected volatile String text;
@Override @Override
public Message copy() { public Message copy() {
@ -55,14 +55,10 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
} }
private void copy(ActiveMQTextMessage copy) { private void copy(ActiveMQTextMessage copy) {
//AMQ-6218 - Save text before calling super.copy() to prevent a race condition when synchronized(this) {
//concurrent store and dispatch is enabled in KahaDB super.copy(copy);
//The issue is sometimes beforeMarshall() gets called in between the time content and copy.text = text;
//text are copied to the new object leading to both fields being null when text should }
//not be null
String text = this.text;
super.copy(copy);
copy.text = text;
} }
@Override @Override
@ -77,21 +73,30 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
@Override @Override
public void setText(String text) throws MessageNotWriteableException { public void setText(String text) throws MessageNotWriteableException {
checkReadOnlyBody(); synchronized(this) {
this.text = text; checkReadOnlyBody();
setContent(null); this.text = text;
setContent(null);
}
} }
@Override @Override
public String getText() throws JMSException { public String getText() throws JMSException {
ByteSequence content = getContent(); ByteSequence content;
String text = this.text; String text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (text == null && content != null) { if (text == null && content != null) {
text = decodeContent(content); text = decodeContent(content);
this.text = text; synchronized(this) {
setContent(null); this.text = text;
setCompressed(false); setContent(null);
setCompressed(false);
}
} }
return text; return text;
} }
@ -131,16 +136,43 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
@Override @Override
public void storeContentAndClear() { public void storeContentAndClear() {
storeContent(); ByteSequence content;
text=null; String text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (content == null && text != null) {
content = marshallContent(text);
}
synchronized(this) {
setContent(content);
text=null;
}
} }
@Override @Override
public void storeContent() { public void storeContent() {
ByteSequence content;
String text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (content == null && text != null) {
content = marshallContent(text);
}
synchronized(this) {
setContent(content);
}
}
private ByteSequence marshallContent(String text) {
ByteSequence content = null;
try { try {
ByteSequence content = getContent(); if (text != null) {
String text = this.text;
if (content == null && text != null) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
OutputStream os = bytesOut; OutputStream os = bytesOut;
ActiveMQConnection connection = getConnection(); ActiveMQConnection connection = getConnection();
@ -151,19 +183,23 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
DataOutputStream dataOut = new DataOutputStream(os); DataOutputStream dataOut = new DataOutputStream(os);
MarshallingSupport.writeUTF8(dataOut, text); MarshallingSupport.writeUTF8(dataOut, text);
dataOut.close(); dataOut.close();
setContent(bytesOut.toByteSequence()); content = bytesOut.toByteSequence();
} }
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return content;
} }
// see https://issues.apache.org/activemq/browse/AMQ-2103 // see https://issues.apache.org/activemq/browse/AMQ-2103
// and https://issues.apache.org/activemq/browse/AMQ-2966 // and https://issues.apache.org/activemq/browse/AMQ-2966
@Override @Override
public void clearMarshalledState() throws JMSException { public void clearMarshalledState() throws JMSException {
super.clearMarshalledState(); synchronized(this) {
this.text = null; super.clearMarshalledState();
this.text = null;
}
} }
/** /**
@ -179,13 +215,20 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
*/ */
@Override @Override
public void clearBody() throws JMSException { public void clearBody() throws JMSException {
super.clearBody(); synchronized(this) {
this.text = null; super.clearBody();
this.text = null;
}
} }
@Override @Override
public int getSize() { public int getSize() {
String text = this.text; ByteSequence content;
String text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (size == 0 && content == null && text != null) { if (size == 0 && content == null && text != null) {
size = getMinimumMessageSize(); size = getMinimumMessageSize();
if (marshalledProperties != null) { if (marshalledProperties != null) {