And code to prevent concurrent writes to a message when dispatched to
multiple Topic consumers.
This commit is contained in:
Timothy Bish 2013-12-17 15:22:08 -05:00
parent 7c01c9b581
commit c387e842ee
1 changed files with 19 additions and 4 deletions

View File

@ -129,6 +129,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
updateTracer();
}
@Override
public void updateTracer() {
if (amqpTransport.isTrace()) {
((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
@ -849,13 +850,27 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
final MessageDispatch md = outbound.removeFirst();
try {
ActiveMQMessage temp = null;
if (md.getMessage() != null) {
org.apache.activemq.command.Message message = md.getMessage();
if (!message.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
message.setProperty(MESSAGE_FORMAT_KEY, 0);
// Topics can dispatch the same Message to more than one consumer
// so we must copy to prevent concurrent read / write to the same
// message object.
if (md.getDestination().isTopic()) {
synchronized (md.getMessage()) {
temp = (ActiveMQMessage) md.getMessage().copy();
}
} else {
temp = (ActiveMQMessage) md.getMessage();
}
if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
temp.setProperty(MESSAGE_FORMAT_KEY, 0);
}
}
final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
final ActiveMQMessage jms = temp;
if (jms == null) {
// It's the end of browse signal.
endOfBrowse = true;