Synchronizing ActiveMQText message on state changes for the content and
text fields so that they are always changed together.  This will prevent
race conditions where data can be lost when using concurrent store and
dispatch.

(cherry picked from commit e0c5499964)
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-03-22 13:41:20 +00:00
parent 93bc7030e2
commit 6c2a825ebb
1 changed files with 71 additions and 28 deletions

View File

@ -45,7 +45,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE;
protected String text;
protected volatile String text;
@Override
public Message copy() {
@ -55,14 +55,10 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
}
private void copy(ActiveMQTextMessage copy) {
//AMQ-6218 - Save text before calling super.copy() to prevent a race condition when
//concurrent store and dispatch is enabled in KahaDB
//The issue is sometimes beforeMarshall() gets called in between the time content and
//text are copied to the new object leading to both fields being null when text should
//not be null
String text = this.text;
super.copy(copy);
copy.text = text;
synchronized(this) {
super.copy(copy);
copy.text = text;
}
}
@Override
@ -77,21 +73,30 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
@Override
public void setText(String text) throws MessageNotWriteableException {
checkReadOnlyBody();
this.text = text;
setContent(null);
synchronized(this) {
checkReadOnlyBody();
this.text = text;
setContent(null);
}
}
@Override
public String getText() throws JMSException {
ByteSequence content = getContent();
String text = this.text;
ByteSequence content;
String text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (text == null && content != null) {
text = decodeContent(content);
this.text = text;
setContent(null);
setCompressed(false);
synchronized(this) {
this.text = text;
setContent(null);
setCompressed(false);
}
}
return text;
}
@ -131,16 +136,43 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
@Override
public void storeContentAndClear() {
storeContent();
text=null;
ByteSequence content;
String text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (content == null && text != null) {
content = marshallContent(text);
}
synchronized(this) {
setContent(content);
text=null;
}
}
@Override
public void storeContent() {
ByteSequence content;
String text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (content == null && text != null) {
content = marshallContent(text);
}
synchronized(this) {
setContent(content);
}
}
private ByteSequence marshallContent(String text) {
ByteSequence content = null;
try {
ByteSequence content = getContent();
String text = this.text;
if (content == null && text != null) {
if (text != null) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
OutputStream os = bytesOut;
ActiveMQConnection connection = getConnection();
@ -151,19 +183,23 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
DataOutputStream dataOut = new DataOutputStream(os);
MarshallingSupport.writeUTF8(dataOut, text);
dataOut.close();
setContent(bytesOut.toByteSequence());
content = bytesOut.toByteSequence();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return content;
}
// see https://issues.apache.org/activemq/browse/AMQ-2103
// and https://issues.apache.org/activemq/browse/AMQ-2966
@Override
public void clearMarshalledState() throws JMSException {
super.clearMarshalledState();
this.text = null;
synchronized(this) {
super.clearMarshalledState();
this.text = null;
}
}
/**
@ -179,13 +215,20 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
*/
@Override
public void clearBody() throws JMSException {
super.clearBody();
this.text = null;
synchronized(this) {
super.clearBody();
this.text = null;
}
}
@Override
public int getSize() {
String text = this.text;
ByteSequence content;
String text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (size == 0 && content == null && text != null) {
size = getMinimumMessageSize();
if (marshalledProperties != null) {