Fixing ServerMessage's copy and MQTT delivery

This commit is contained in:
Clebert Suconic 2016-01-27 16:17:13 -05:00
parent 392e1c91fa
commit e62a820414
12 changed files with 35 additions and 20 deletions

View File

@ -248,7 +248,7 @@ public interface Message {
* Returns a <em>copy</em> of the message body as an ActiveMQBuffer. Any modification * Returns a <em>copy</em> of the message body as an ActiveMQBuffer. Any modification
* of this buffer should not impact the underlying buffer. * of this buffer should not impact the underlying buffer.
*/ */
ActiveMQBuffer getBodyBufferCopy(); ActiveMQBuffer getBodyBufferDuplicate();
// Properties // Properties
// ----------------------------------------------------------------- // -----------------------------------------------------------------

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.buffers.impl;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.message.impl.MessageInternal;
@ -45,7 +46,13 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) { public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) {
// a wrapped inside a wrapper will increase the stack size. // a wrapped inside a wrapper will increase the stack size.
// we fixed this here due to some profiling testing // we fixed this here due to some profiling testing
super(unwrap(buffer.byteBuf()).duplicate()); this(limit, unwrap(buffer.byteBuf()).duplicate(), message);
}
public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final MessageInternal message) {
// a wrapped inside a wrapper will increase the stack size.
// we fixed this here due to some profiling testing
super(buffer);
this.limit = limit; this.limit = limit;

View File

@ -21,12 +21,14 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.message.BodyEncoder; import org.apache.activemq.artemis.core.message.BodyEncoder;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
@ -147,16 +149,20 @@ public abstract class MessageImpl implements MessageInternal {
// with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
// many subscriptions and bridging to other nodes in a cluster // many subscriptions and bridging to other nodes in a cluster
synchronized (other) { synchronized (other) {
bufferValid = other.bufferValid; bufferValid = false;
endOfBodyPosition = other.endOfBodyPosition; endOfBodyPosition = -1;
endOfMessagePosition = other.endOfMessagePosition; endOfMessagePosition = other.endOfMessagePosition;
if (other.buffer != null) { if (other.buffer != null) {
// We need to copy the underlying buffer too, since the different messsages thereafter might have different // We need to copy the underlying buffer too, since the different messsages thereafter might have different
// properties set on them, making their encoding different // properties set on them, making their encoding different
buffer = other.buffer.copy(0, other.buffer.writerIndex()); buffer = other.buffer.copy(0, other.buffer.capacity());
buffer.setIndex(other.buffer.readerIndex(), buffer.capacity()); buffer.setIndex(other.buffer.readerIndex(), buffer.capacity());
bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
bodyBuffer.readerIndex(BODY_OFFSET);
bodyBuffer.writerIndex(other.getBodyBuffer().writerIndex());
} }
} }
} }
@ -267,14 +273,16 @@ public abstract class MessageImpl implements MessageInternal {
} }
@Override @Override
public synchronized ActiveMQBuffer getBodyBufferCopy() { public synchronized ActiveMQBuffer getBodyBufferDuplicate() {
// Must copy buffer before sending it // Must copy buffer before sending it
ActiveMQBuffer newBuffer = buffer.copy(0, buffer.capacity()); ByteBuf byteBuf = ChannelBufferWrapper.unwrap(getBodyBuffer().byteBuf());
byteBuf = byteBuf.duplicate();
byteBuf.writerIndex(getBodyBuffer().writerIndex());
byteBuf.readerIndex(getBodyBuffer().readerIndex());
newBuffer.setIndex(0, getEndOfBodyPosition()); return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, byteBuf, null);
return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, newBuffer, null);
} }
@Override @Override

View File

@ -52,7 +52,7 @@ public class ServerJMSMessage implements Message {
protected ActiveMQBuffer getReadBodyBuffer() { protected ActiveMQBuffer getReadBodyBuffer() {
if (readBodyBuffer == null) { if (readBodyBuffer == null) {
// to avoid clashes between multiple threads // to avoid clashes between multiple threads
readBodyBuffer = message.getBodyBufferCopy(); readBodyBuffer = message.getBodyBufferDuplicate();
} }
return readBodyBuffer; return readBodyBuffer;
} }

View File

@ -144,7 +144,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
} }
} }
catch (Exception e) { catch (Exception e) {
log.debug("Error processing Control Packet, Disconnecting Client" + e.getMessage()); log.warn("Error processing Control Packet, Disconnecting Client" + e.getMessage());
disconnect(); disconnect();
} }
} }

View File

@ -216,8 +216,8 @@ public class MQTTPublishManager {
private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) { private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString()).toString(); String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString()).toString();
//FIXME should we be copying the body buffer here? ByteBuf payload = message.getBodyBufferDuplicate().byteBuf();
ByteBuf payload = message.getBodyBufferCopy().byteBuf();
session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount); session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
} }

View File

@ -492,7 +492,7 @@ public class OpenWireMessageConverter implements MessageConverter {
} }
amqMsg.setBrokerInTime(brokerInTime); amqMsg.setBrokerInTime(brokerInTime);
ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy(); ActiveMQBuffer buffer = coreMessage.getBodyBufferDuplicate();
Boolean compressProp = (Boolean)coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); Boolean compressProp = (Boolean)coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
boolean isCompressed = compressProp == null ? false : compressProp.booleanValue(); boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
amqMsg.setCompressed(isCompressed); amqMsg.setCompressed(isCompressed);

View File

@ -293,7 +293,7 @@ public abstract class VersionedStompFrameHandler {
frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID()); frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
} }
ActiveMQBuffer buffer = serverMessage.getBodyBufferCopy(); ActiveMQBuffer buffer = serverMessage.getBodyBufferDuplicate();
int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition(); int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition();

View File

@ -135,7 +135,7 @@ public final class OpenTypeSupport {
rc.put(CompositeDataConstants.PRIORITY, m.getPriority()); rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1); rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1);
ActiveMQBuffer bodyCopy = m.getBodyBufferCopy(); ActiveMQBuffer bodyCopy = m.getBodyBufferDuplicate();
byte[] bytes = new byte[bodyCopy.readableBytes()]; byte[] bytes = new byte[bodyCopy.readableBytes()];
bodyCopy.readBytes(bytes); bodyCopy.readBytes(bytes);
rc.put(CompositeDataConstants.BODY, bytes); rc.put(CompositeDataConstants.BODY, bytes);

View File

@ -293,7 +293,7 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage {
@Override @Override
public String toString() { public String toString() {
return "ServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBufferCopy().capacity() + return "ServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBufferDuplicate().capacity() +
", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this); ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
} }

View File

@ -570,7 +570,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
} }
@Override @Override
public ActiveMQBuffer getBodyBufferCopy() { public ActiveMQBuffer getBodyBufferDuplicate() {
return null; return null;
} }

View File

@ -437,7 +437,7 @@ public class AcknowledgeTest extends ActiveMQTestBase {
} }
@Override @Override
public ActiveMQBuffer getBodyBufferCopy() { public ActiveMQBuffer getBodyBufferDuplicate() {
return null; return null;
} }