This closes #357
This commit is contained in:
commit
7673c31083
|
@ -248,7 +248,7 @@ public interface Message {
|
||||||
* Returns a <em>copy</em> of the message body as an ActiveMQBuffer. Any modification
|
* Returns a <em>copy</em> of the message body as an ActiveMQBuffer. Any modification
|
||||||
* of this buffer should not impact the underlying buffer.
|
* of this buffer should not impact the underlying buffer.
|
||||||
*/
|
*/
|
||||||
ActiveMQBuffer getBodyBufferCopy();
|
ActiveMQBuffer getBodyBufferDuplicate();
|
||||||
|
|
||||||
// Properties
|
// Properties
|
||||||
// -----------------------------------------------------------------
|
// -----------------------------------------------------------------
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.buffers.impl;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
|
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
|
||||||
|
@ -45,7 +46,13 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
|
||||||
public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) {
|
public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) {
|
||||||
// a wrapped inside a wrapper will increase the stack size.
|
// a wrapped inside a wrapper will increase the stack size.
|
||||||
// we fixed this here due to some profiling testing
|
// we fixed this here due to some profiling testing
|
||||||
super(unwrap(buffer.byteBuf()).duplicate());
|
this(limit, unwrap(buffer.byteBuf()).duplicate(), message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final MessageInternal message) {
|
||||||
|
// a wrapped inside a wrapper will increase the stack size.
|
||||||
|
// we fixed this here due to some profiling testing
|
||||||
|
super(buffer);
|
||||||
|
|
||||||
this.limit = limit;
|
this.limit = limit;
|
||||||
|
|
||||||
|
|
|
@ -21,12 +21,14 @@ import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
|
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||||
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
|
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.core.message.BodyEncoder;
|
import org.apache.activemq.artemis.core.message.BodyEncoder;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
|
@ -147,16 +149,20 @@ public abstract class MessageImpl implements MessageInternal {
|
||||||
// with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
|
// with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
|
||||||
// many subscriptions and bridging to other nodes in a cluster
|
// many subscriptions and bridging to other nodes in a cluster
|
||||||
synchronized (other) {
|
synchronized (other) {
|
||||||
bufferValid = other.bufferValid;
|
bufferValid = false;
|
||||||
endOfBodyPosition = other.endOfBodyPosition;
|
endOfBodyPosition = -1;
|
||||||
endOfMessagePosition = other.endOfMessagePosition;
|
endOfMessagePosition = other.endOfMessagePosition;
|
||||||
|
|
||||||
if (other.buffer != null) {
|
if (other.buffer != null) {
|
||||||
// We need to copy the underlying buffer too, since the different messsages thereafter might have different
|
// We need to copy the underlying buffer too, since the different messsages thereafter might have different
|
||||||
// properties set on them, making their encoding different
|
// properties set on them, making their encoding different
|
||||||
buffer = other.buffer.copy(0, other.buffer.writerIndex());
|
buffer = other.buffer.copy(0, other.buffer.capacity());
|
||||||
|
|
||||||
buffer.setIndex(other.buffer.readerIndex(), buffer.capacity());
|
buffer.setIndex(other.buffer.readerIndex(), buffer.capacity());
|
||||||
|
|
||||||
|
bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
|
||||||
|
bodyBuffer.readerIndex(BODY_OFFSET);
|
||||||
|
bodyBuffer.writerIndex(other.getBodyBuffer().writerIndex());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -267,14 +273,16 @@ public abstract class MessageImpl implements MessageInternal {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized ActiveMQBuffer getBodyBufferCopy() {
|
public synchronized ActiveMQBuffer getBodyBufferDuplicate() {
|
||||||
|
|
||||||
// Must copy buffer before sending it
|
// Must copy buffer before sending it
|
||||||
|
|
||||||
ActiveMQBuffer newBuffer = buffer.copy(0, buffer.capacity());
|
ByteBuf byteBuf = ChannelBufferWrapper.unwrap(getBodyBuffer().byteBuf());
|
||||||
|
byteBuf = byteBuf.duplicate();
|
||||||
|
byteBuf.writerIndex(getBodyBuffer().writerIndex());
|
||||||
|
byteBuf.readerIndex(getBodyBuffer().readerIndex());
|
||||||
|
|
||||||
newBuffer.setIndex(0, getEndOfBodyPosition());
|
return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, byteBuf, null);
|
||||||
|
|
||||||
return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, newBuffer, null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -52,7 +52,7 @@ public class ServerJMSMessage implements Message {
|
||||||
protected ActiveMQBuffer getReadBodyBuffer() {
|
protected ActiveMQBuffer getReadBodyBuffer() {
|
||||||
if (readBodyBuffer == null) {
|
if (readBodyBuffer == null) {
|
||||||
// to avoid clashes between multiple threads
|
// to avoid clashes between multiple threads
|
||||||
readBodyBuffer = message.getBodyBufferCopy();
|
readBodyBuffer = message.getBodyBufferDuplicate();
|
||||||
}
|
}
|
||||||
return readBodyBuffer;
|
return readBodyBuffer;
|
||||||
}
|
}
|
||||||
|
|
|
@ -144,7 +144,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.debug("Error processing Control Packet, Disconnecting Client" + e.getMessage());
|
log.warn("Error processing Control Packet, Disconnecting Client" + e.getMessage());
|
||||||
disconnect();
|
disconnect();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -216,8 +216,8 @@ public class MQTTPublishManager {
|
||||||
private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
|
private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
|
||||||
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString()).toString();
|
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString()).toString();
|
||||||
|
|
||||||
//FIXME should we be copying the body buffer here?
|
ByteBuf payload = message.getBodyBufferDuplicate().byteBuf();
|
||||||
ByteBuf payload = message.getBodyBufferCopy().byteBuf();
|
|
||||||
session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
|
session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -492,7 +492,7 @@ public class OpenWireMessageConverter implements MessageConverter {
|
||||||
}
|
}
|
||||||
amqMsg.setBrokerInTime(brokerInTime);
|
amqMsg.setBrokerInTime(brokerInTime);
|
||||||
|
|
||||||
ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy();
|
ActiveMQBuffer buffer = coreMessage.getBodyBufferDuplicate();
|
||||||
Boolean compressProp = (Boolean)coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
|
Boolean compressProp = (Boolean)coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
|
||||||
boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
|
boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
|
||||||
amqMsg.setCompressed(isCompressed);
|
amqMsg.setCompressed(isCompressed);
|
||||||
|
|
|
@ -293,7 +293,7 @@ public abstract class VersionedStompFrameHandler {
|
||||||
frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
|
frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
|
||||||
}
|
}
|
||||||
|
|
||||||
ActiveMQBuffer buffer = serverMessage.getBodyBufferCopy();
|
ActiveMQBuffer buffer = serverMessage.getBodyBufferDuplicate();
|
||||||
|
|
||||||
int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition();
|
int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition();
|
||||||
|
|
||||||
|
|
|
@ -135,7 +135,7 @@ public final class OpenTypeSupport {
|
||||||
rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
|
rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
|
||||||
rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1);
|
rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1);
|
||||||
|
|
||||||
ActiveMQBuffer bodyCopy = m.getBodyBufferCopy();
|
ActiveMQBuffer bodyCopy = m.getBodyBufferDuplicate();
|
||||||
byte[] bytes = new byte[bodyCopy.readableBytes()];
|
byte[] bytes = new byte[bodyCopy.readableBytes()];
|
||||||
bodyCopy.readBytes(bytes);
|
bodyCopy.readBytes(bytes);
|
||||||
rc.put(CompositeDataConstants.BODY, bytes);
|
rc.put(CompositeDataConstants.BODY, bytes);
|
||||||
|
|
|
@ -293,7 +293,7 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBufferCopy().capacity() +
|
return "ServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBufferDuplicate().capacity() +
|
||||||
", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
|
", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
|
||||||
", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
|
", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
|
||||||
}
|
}
|
||||||
|
|
|
@ -570,7 +570,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActiveMQBuffer getBodyBufferCopy() {
|
public ActiveMQBuffer getBodyBufferDuplicate() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -437,7 +437,7 @@ public class AcknowledgeTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActiveMQBuffer getBodyBufferCopy() {
|
public ActiveMQBuffer getBodyBufferDuplicate() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue