Reverting commit to keep sync out of the client messages

This reverts commit e0c5499964.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-04-11 12:17:06 +00:00
parent b1c55fdc74
commit 837da7e582
1 changed files with 28 additions and 71 deletions

View File

@ -45,7 +45,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE; public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE;
protected volatile String text; protected String text;
@Override @Override
public Message copy() { public Message copy() {
@ -55,11 +55,15 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
} }
private void copy(ActiveMQTextMessage copy) { private void copy(ActiveMQTextMessage copy) {
synchronized(this) { //AMQ-6218 - Save text before calling super.copy() to prevent a race condition when
//concurrent store and dispatch is enabled in KahaDB
//The issue is sometimes beforeMarshall() gets called in between the time content and
//text are copied to the new object leading to both fields being null when text should
//not be null
String text = this.text;
super.copy(copy); super.copy(copy);
copy.text = text; copy.text = text;
} }
}
@Override @Override
public byte getDataStructureType() { public byte getDataStructureType() {
@ -73,31 +77,22 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
@Override @Override
public void setText(String text) throws MessageNotWriteableException { public void setText(String text) throws MessageNotWriteableException {
synchronized(this) {
checkReadOnlyBody(); checkReadOnlyBody();
this.text = text; this.text = text;
setContent(null); setContent(null);
} }
}
@Override @Override
public String getText() throws JMSException { public String getText() throws JMSException {
ByteSequence content; ByteSequence content = getContent();
String text; String text = this.text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (text == null && content != null) { if (text == null && content != null) {
text = decodeContent(content); text = decodeContent(content);
synchronized(this) {
this.text = text; this.text = text;
setContent(null); setContent(null);
setCompressed(false); setCompressed(false);
} }
}
return text; return text;
} }
@ -136,43 +131,16 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
@Override @Override
public void storeContentAndClear() { public void storeContentAndClear() {
ByteSequence content; storeContent();
String text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (content == null && text != null) {
content = marshallContent(text);
}
synchronized(this) {
setContent(content);
text=null; text=null;
} }
}
@Override @Override
public void storeContent() { public void storeContent() {
ByteSequence content;
String text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (content == null && text != null) {
content = marshallContent(text);
}
synchronized(this) {
setContent(content);
}
}
private ByteSequence marshallContent(String text) {
ByteSequence content = null;
try { try {
if (text != null) { ByteSequence content = getContent();
String text = this.text;
if (content == null && text != null) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
OutputStream os = bytesOut; OutputStream os = bytesOut;
ActiveMQConnection connection = getConnection(); ActiveMQConnection connection = getConnection();
@ -183,24 +151,20 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
DataOutputStream dataOut = new DataOutputStream(os); DataOutputStream dataOut = new DataOutputStream(os);
MarshallingSupport.writeUTF8(dataOut, text); MarshallingSupport.writeUTF8(dataOut, text);
dataOut.close(); dataOut.close();
content = bytesOut.toByteSequence(); setContent(bytesOut.toByteSequence());
} }
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return content;
} }
// see https://issues.apache.org/activemq/browse/AMQ-2103 // see https://issues.apache.org/activemq/browse/AMQ-2103
// and https://issues.apache.org/activemq/browse/AMQ-2966 // and https://issues.apache.org/activemq/browse/AMQ-2966
@Override @Override
public void clearMarshalledState() throws JMSException { public void clearMarshalledState() throws JMSException {
synchronized(this) {
super.clearMarshalledState(); super.clearMarshalledState();
this.text = null; this.text = null;
} }
}
/** /**
* Clears out the message body. Clearing a message's body does not clear its * Clears out the message body. Clearing a message's body does not clear its
@ -215,20 +179,13 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
*/ */
@Override @Override
public void clearBody() throws JMSException { public void clearBody() throws JMSException {
synchronized(this) {
super.clearBody(); super.clearBody();
this.text = null; this.text = null;
} }
}
@Override @Override
public int getSize() { public int getSize() {
ByteSequence content; String text = this.text;
String text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (size == 0 && content == null && text != null) { if (size == 0 && content == null && text != null) {
size = getMinimumMessageSize(); size = getMinimumMessageSize();
if (marshalledProperties != null) { if (marshalledProperties != null) {