From e62a820414bae8e5bfaf52febdb6f0edca19489e Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 27 Jan 2016 16:17:13 -0500 Subject: [PATCH] Fixing ServerMessage's copy and MQTT delivery --- .../activemq/artemis/api/core/Message.java | 2 +- .../impl/ResetLimitWrappedActiveMQBuffer.java | 9 ++++++- .../core/message/impl/MessageImpl.java | 24 ++++++++++++------- .../converter/jms/ServerJMSMessage.java | 2 +- .../protocol/mqtt/MQTTProtocolHandler.java | 2 +- .../protocol/mqtt/MQTTPublishManager.java | 4 ++-- .../openwire/OpenWireMessageConverter.java | 2 +- .../stomp/VersionedStompFrameHandler.java | 2 +- .../impl/openmbean/OpenTypeSupport.java | 2 +- .../core/server/impl/ServerMessageImpl.java | 2 +- .../impl/ScheduledDeliveryHandlerTest.java | 2 +- .../integration/client/AcknowledgeTest.java | 2 +- 12 files changed, 35 insertions(+), 20 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index f5cb55bd6c..5bb6f42d69 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -248,7 +248,7 @@ public interface Message { * Returns a copy of the message body as an ActiveMQBuffer. Any modification * of this buffer should not impact the underlying buffer. */ - ActiveMQBuffer getBodyBufferCopy(); + ActiveMQBuffer getBodyBufferDuplicate(); // Properties // ----------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java index 61ecef613b..ec6cf092e1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.buffers.impl; import java.nio.ByteBuffer; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.MessageInternal; @@ -45,7 +46,13 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) { // a wrapped inside a wrapper will increase the stack size. // we fixed this here due to some profiling testing - super(unwrap(buffer.byteBuf()).duplicate()); + this(limit, unwrap(buffer.byteBuf()).duplicate(), message); + } + + public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final MessageInternal message) { + // a wrapped inside a wrapper will increase the stack size. + // we fixed this here due to some profiling testing + super(buffer); this.limit = limit; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java index ca526211b8..7583ce2f19 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java @@ -21,12 +21,14 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; import org.apache.activemq.artemis.core.message.BodyEncoder; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; @@ -147,16 +149,20 @@ public abstract class MessageImpl implements MessageInternal { // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to // many subscriptions and bridging to other nodes in a cluster synchronized (other) { - bufferValid = other.bufferValid; - endOfBodyPosition = other.endOfBodyPosition; + bufferValid = false; + endOfBodyPosition = -1; endOfMessagePosition = other.endOfMessagePosition; if (other.buffer != null) { // We need to copy the underlying buffer too, since the different messsages thereafter might have different // properties set on them, making their encoding different - buffer = other.buffer.copy(0, other.buffer.writerIndex()); + buffer = other.buffer.copy(0, other.buffer.capacity()); buffer.setIndex(other.buffer.readerIndex(), buffer.capacity()); + + bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this); + bodyBuffer.readerIndex(BODY_OFFSET); + bodyBuffer.writerIndex(other.getBodyBuffer().writerIndex()); } } } @@ -267,14 +273,16 @@ public abstract class MessageImpl implements MessageInternal { } @Override - public synchronized ActiveMQBuffer getBodyBufferCopy() { + public synchronized ActiveMQBuffer getBodyBufferDuplicate() { + // Must copy buffer before sending it - ActiveMQBuffer newBuffer = buffer.copy(0, buffer.capacity()); + ByteBuf byteBuf = ChannelBufferWrapper.unwrap(getBodyBuffer().byteBuf()); + byteBuf = byteBuf.duplicate(); + byteBuf.writerIndex(getBodyBuffer().writerIndex()); + byteBuf.readerIndex(getBodyBuffer().readerIndex()); - newBuffer.setIndex(0, getEndOfBodyPosition()); - - return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, newBuffer, null); + return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, byteBuf, null); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java index 1686ea7837..7902fa0df6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java @@ -52,7 +52,7 @@ public class ServerJMSMessage implements Message { protected ActiveMQBuffer getReadBodyBuffer() { if (readBodyBuffer == null) { // to avoid clashes between multiple threads - readBodyBuffer = message.getBodyBufferCopy(); + readBodyBuffer = message.getBodyBufferDuplicate(); } return readBodyBuffer; } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index c38593c2f0..57c2b57c63 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -144,7 +144,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { } } catch (Exception e) { - log.debug("Error processing Control Packet, Disconnecting Client" + e.getMessage()); + log.warn("Error processing Control Packet, Disconnecting Client" + e.getMessage()); disconnect(); } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index ac4042076f..b0df5a2c19 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -216,8 +216,8 @@ public class MQTTPublishManager { private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) { String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString()).toString(); - //FIXME should we be copying the body buffer here? - ByteBuf payload = message.getBodyBufferCopy().byteBuf(); + ByteBuf payload = message.getBodyBufferDuplicate().byteBuf(); + session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 6d3ff135bb..2b863c187d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -492,7 +492,7 @@ public class OpenWireMessageConverter implements MessageConverter { } amqMsg.setBrokerInTime(brokerInTime); - ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy(); + ActiveMQBuffer buffer = coreMessage.getBodyBufferDuplicate(); Boolean compressProp = (Boolean)coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); boolean isCompressed = compressProp == null ? false : compressProp.booleanValue(); amqMsg.setCompressed(isCompressed); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 1e0206a0a8..5da8574786 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -293,7 +293,7 @@ public abstract class VersionedStompFrameHandler { frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID()); } - ActiveMQBuffer buffer = serverMessage.getBodyBufferCopy(); + ActiveMQBuffer buffer = serverMessage.getBodyBufferDuplicate(); int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java index 1f3a109ff0..7bb37647df 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java @@ -135,7 +135,7 @@ public final class OpenTypeSupport { rc.put(CompositeDataConstants.PRIORITY, m.getPriority()); rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1); - ActiveMQBuffer bodyCopy = m.getBodyBufferCopy(); + ActiveMQBuffer bodyCopy = m.getBodyBufferDuplicate(); byte[] bytes = new byte[bodyCopy.readableBytes()]; bodyCopy.readBytes(bytes); rc.put(CompositeDataConstants.BODY, bytes); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java index 645d3263ff..e7a7e67d85 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java @@ -293,7 +293,7 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage { @Override public String toString() { - return "ServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBufferCopy().capacity() + + return "ServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBufferDuplicate().capacity() + ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index e3d6850b91..c54881d26c 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -570,7 +570,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public ActiveMQBuffer getBodyBufferCopy() { + public ActiveMQBuffer getBodyBufferDuplicate() { return null; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index 6067f94658..476f191a9a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -437,7 +437,7 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override - public ActiveMQBuffer getBodyBufferCopy() { + public ActiveMQBuffer getBodyBufferDuplicate() { return null; }