diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java index c9345b115d..4618341ee3 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java @@ -45,7 +45,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE; - protected volatile String text; + protected String text; @Override public Message copy() { @@ -55,10 +55,14 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage } private void copy(ActiveMQTextMessage copy) { - synchronized(this) { - super.copy(copy); - copy.text = text; - } + //AMQ-6218 - Save text before calling super.copy() to prevent a race condition when + //concurrent store and dispatch is enabled in KahaDB + //The issue is sometimes beforeMarshall() gets called in between the time content and + //text are copied to the new object leading to both fields being null when text should + //not be null + String text = this.text; + super.copy(copy); + copy.text = text; } @Override @@ -73,30 +77,21 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage @Override public void setText(String text) throws MessageNotWriteableException { - synchronized(this) { - checkReadOnlyBody(); - this.text = text; - setContent(null); - } + checkReadOnlyBody(); + this.text = text; + setContent(null); } @Override public String getText() throws JMSException { - ByteSequence content; - String text; - - synchronized(this) { - content = getContent(); - text = this.text; - } + ByteSequence content = getContent(); + String text = this.text; if (text == null && content != null) { text = decodeContent(content); - synchronized(this) { - this.text = text; - setContent(null); - setCompressed(false); - } + this.text = text; + setContent(null); + setCompressed(false); } return text; } @@ -136,43 +131,16 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage @Override public void storeContentAndClear() { - ByteSequence content; - String text; - synchronized(this) { - content = getContent(); - text = this.text; - } - if (content == null && text != null) { - content = marshallContent(text); - } - synchronized(this) { - setContent(content); - text=null; - } + storeContent(); + text=null; } @Override public void storeContent() { - ByteSequence content; - String text; - synchronized(this) { - content = getContent(); - text = this.text; - } - - if (content == null && text != null) { - content = marshallContent(text); - } - - synchronized(this) { - setContent(content); - } - } - - private ByteSequence marshallContent(String text) { - ByteSequence content = null; try { - if (text != null) { + ByteSequence content = getContent(); + String text = this.text; + if (content == null && text != null) { ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); OutputStream os = bytesOut; ActiveMQConnection connection = getConnection(); @@ -183,23 +151,19 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage DataOutputStream dataOut = new DataOutputStream(os); MarshallingSupport.writeUTF8(dataOut, text); dataOut.close(); - content = bytesOut.toByteSequence(); + setContent(bytesOut.toByteSequence()); } } catch (IOException e) { throw new RuntimeException(e); } - return content; } - // see https://issues.apache.org/activemq/browse/AMQ-2103 // and https://issues.apache.org/activemq/browse/AMQ-2966 @Override public void clearMarshalledState() throws JMSException { - synchronized(this) { - super.clearMarshalledState(); - this.text = null; - } + super.clearMarshalledState(); + this.text = null; } /** @@ -215,20 +179,13 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage */ @Override public void clearBody() throws JMSException { - synchronized(this) { - super.clearBody(); - this.text = null; - } + super.clearBody(); + this.text = null; } @Override public int getSize() { - ByteSequence content; - String text; - synchronized(this) { - content = getContent(); - text = this.text; - } + String text = this.text; if (size == 0 && content == null && text != null) { size = getMinimumMessageSize(); if (marshalledProperties != null) {