From f38d5c7dbcb66bebc5b9fba984845bd8d6aadc0c Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 15 Feb 2017 12:51:05 -0500 Subject: [PATCH] ARTEMIS-969 Unecessary buffer expansion on message delivery --- .../wireformat/SessionReceiveMessage.java | 4 +- .../integration/client/ConsumerTest.java | 48 +++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java index ce76186531..c21ebda0d9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java @@ -56,8 +56,8 @@ public class SessionReceiveMessage extends MessagePacket { public ActiveMQBuffer encode(final RemotingConnection connection) { ActiveMQBuffer buffer = message.getEncodedBuffer(); - ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex(), true); - bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity()); + ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, true); + bufferWrite.writeBytes(buffer, 0, buffer.capacity()); bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex()); // Sanity check diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 1c1f929dab..8f00b2a858 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -34,6 +35,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; @@ -41,6 +43,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.junit.Assert; import org.junit.Before; @@ -95,6 +98,50 @@ public class ConsumerTest extends ActiveMQTestBase { } + @Test + public void testSimpleSend() throws Throwable { + receive(false); + } + + @Test + public void testSimpleSendWithCloseConsumer() throws Throwable { + receive(true); + } + + private void receive(boolean cancelOnce) throws Throwable { + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true, false); + + session.createQueue(QUEUE, QUEUE, null, false); + + ClientConsumer consumer = session.createConsumer(QUEUE); + + ClientProducer producer = session.createProducer(QUEUE); + ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 0, System.currentTimeMillis(), (byte) 4); + message.getBodyBuffer().writeString("hi"); + message.putStringProperty("hello", "elo"); + producer.send(message); + + session.start(); + + if (cancelOnce) { + final ClientConsumerInternal consumerInternal = (ClientConsumerInternal)consumer; + Wait.waitFor(() -> consumerInternal.getBufferSize() > 0); + consumer.close(); + consumer = session.createConsumer(QUEUE); + } + ClientMessage message2 = consumer.receive(1000); + + System.out.println("Id::" + message2.getMessageID()); + + System.out.println("Received " + message2); + + session.close(); + } + + + @Test public void testConsumerAckImmediateAutoCommitTrue() throws Exception { ClientSessionFactory sf = createSessionFactory(locator); @@ -323,6 +370,7 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSessionFactory sf = createSessionFactory(locator); ClientSession session = sf.createSession(false, true, true); + session.start(); session.createQueue(QUEUE, QUEUE, null, false);