This reverts commit 6c2a825ebb.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-04-11 12:18:51 +00:00
parent 6c79298541
commit 078da5e7f8
1 changed files with 28 additions and 71 deletions

View File

@ -45,7 +45,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE; public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE;
protected volatile String text; protected String text;
@Override @Override
public Message copy() { public Message copy() {
@ -55,10 +55,14 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
} }
private void copy(ActiveMQTextMessage copy) { private void copy(ActiveMQTextMessage copy) {
synchronized(this) { //AMQ-6218 - Save text before calling super.copy() to prevent a race condition when
super.copy(copy); //concurrent store and dispatch is enabled in KahaDB
copy.text = text; //The issue is sometimes beforeMarshall() gets called in between the time content and
} //text are copied to the new object leading to both fields being null when text should
//not be null
String text = this.text;
super.copy(copy);
copy.text = text;
} }
@Override @Override
@ -73,30 +77,21 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
@Override @Override
public void setText(String text) throws MessageNotWriteableException { public void setText(String text) throws MessageNotWriteableException {
synchronized(this) { checkReadOnlyBody();
checkReadOnlyBody(); this.text = text;
this.text = text; setContent(null);
setContent(null);
}
} }
@Override @Override
public String getText() throws JMSException { public String getText() throws JMSException {
ByteSequence content; ByteSequence content = getContent();
String text; String text = this.text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (text == null && content != null) { if (text == null && content != null) {
text = decodeContent(content); text = decodeContent(content);
synchronized(this) { this.text = text;
this.text = text; setContent(null);
setContent(null); setCompressed(false);
setCompressed(false);
}
} }
return text; return text;
} }
@ -136,43 +131,16 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
@Override @Override
public void storeContentAndClear() { public void storeContentAndClear() {
ByteSequence content; storeContent();
String text; text=null;
synchronized(this) {
content = getContent();
text = this.text;
}
if (content == null && text != null) {
content = marshallContent(text);
}
synchronized(this) {
setContent(content);
text=null;
}
} }
@Override @Override
public void storeContent() { public void storeContent() {
ByteSequence content;
String text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (content == null && text != null) {
content = marshallContent(text);
}
synchronized(this) {
setContent(content);
}
}
private ByteSequence marshallContent(String text) {
ByteSequence content = null;
try { try {
if (text != null) { ByteSequence content = getContent();
String text = this.text;
if (content == null && text != null) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
OutputStream os = bytesOut; OutputStream os = bytesOut;
ActiveMQConnection connection = getConnection(); ActiveMQConnection connection = getConnection();
@ -183,23 +151,19 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
DataOutputStream dataOut = new DataOutputStream(os); DataOutputStream dataOut = new DataOutputStream(os);
MarshallingSupport.writeUTF8(dataOut, text); MarshallingSupport.writeUTF8(dataOut, text);
dataOut.close(); dataOut.close();
content = bytesOut.toByteSequence(); setContent(bytesOut.toByteSequence());
} }
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return content;
} }
// see https://issues.apache.org/activemq/browse/AMQ-2103 // see https://issues.apache.org/activemq/browse/AMQ-2103
// and https://issues.apache.org/activemq/browse/AMQ-2966 // and https://issues.apache.org/activemq/browse/AMQ-2966
@Override @Override
public void clearMarshalledState() throws JMSException { public void clearMarshalledState() throws JMSException {
synchronized(this) { super.clearMarshalledState();
super.clearMarshalledState(); this.text = null;
this.text = null;
}
} }
/** /**
@ -215,20 +179,13 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
*/ */
@Override @Override
public void clearBody() throws JMSException { public void clearBody() throws JMSException {
synchronized(this) { super.clearBody();
super.clearBody(); this.text = null;
this.text = null;
}
} }
@Override @Override
public int getSize() { public int getSize() {
ByteSequence content; String text = this.text;
String text;
synchronized(this) {
content = getContent();
text = this.text;
}
if (size == 0 && content == null && text != null) { if (size == 0 && content == null && text != null) {
size = getMinimumMessageSize(); size = getMinimumMessageSize();
if (marshalledProperties != null) { if (marshalledProperties != null) {