ARTEMIS-969 Unecessary buffer expansion on message delivery
This commit is contained in:
parent
7a0a376dd4
commit
f38d5c7dbc
|
@ -56,8 +56,8 @@ public class SessionReceiveMessage extends MessagePacket {
|
||||||
public ActiveMQBuffer encode(final RemotingConnection connection) {
|
public ActiveMQBuffer encode(final RemotingConnection connection) {
|
||||||
ActiveMQBuffer buffer = message.getEncodedBuffer();
|
ActiveMQBuffer buffer = message.getEncodedBuffer();
|
||||||
|
|
||||||
ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex(), true);
|
ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, true);
|
||||||
bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity());
|
bufferWrite.writeBytes(buffer, 0, buffer.capacity());
|
||||||
bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
|
bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
|
||||||
|
|
||||||
// Sanity check
|
// Sanity check
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||||
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
@ -34,6 +35,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
|
@ -41,6 +43,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -95,6 +98,50 @@ public class ConsumerTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleSend() throws Throwable {
|
||||||
|
receive(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleSendWithCloseConsumer() throws Throwable {
|
||||||
|
receive(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void receive(boolean cancelOnce) throws Throwable {
|
||||||
|
ClientSessionFactory sf = createSessionFactory(locator);
|
||||||
|
|
||||||
|
ClientSession session = sf.createSession(false, true, true, false);
|
||||||
|
|
||||||
|
session.createQueue(QUEUE, QUEUE, null, false);
|
||||||
|
|
||||||
|
ClientConsumer consumer = session.createConsumer(QUEUE);
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(QUEUE);
|
||||||
|
ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 0, System.currentTimeMillis(), (byte) 4);
|
||||||
|
message.getBodyBuffer().writeString("hi");
|
||||||
|
message.putStringProperty("hello", "elo");
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
session.start();
|
||||||
|
|
||||||
|
if (cancelOnce) {
|
||||||
|
final ClientConsumerInternal consumerInternal = (ClientConsumerInternal)consumer;
|
||||||
|
Wait.waitFor(() -> consumerInternal.getBufferSize() > 0);
|
||||||
|
consumer.close();
|
||||||
|
consumer = session.createConsumer(QUEUE);
|
||||||
|
}
|
||||||
|
ClientMessage message2 = consumer.receive(1000);
|
||||||
|
|
||||||
|
System.out.println("Id::" + message2.getMessageID());
|
||||||
|
|
||||||
|
System.out.println("Received " + message2);
|
||||||
|
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConsumerAckImmediateAutoCommitTrue() throws Exception {
|
public void testConsumerAckImmediateAutoCommitTrue() throws Exception {
|
||||||
ClientSessionFactory sf = createSessionFactory(locator);
|
ClientSessionFactory sf = createSessionFactory(locator);
|
||||||
|
@ -323,6 +370,7 @@ public class ConsumerTest extends ActiveMQTestBase {
|
||||||
ClientSessionFactory sf = createSessionFactory(locator);
|
ClientSessionFactory sf = createSessionFactory(locator);
|
||||||
|
|
||||||
ClientSession session = sf.createSession(false, true, true);
|
ClientSession session = sf.createSession(false, true, true);
|
||||||
|
session.start();
|
||||||
|
|
||||||
session.createQueue(QUEUE, QUEUE, null, false);
|
session.createQueue(QUEUE, QUEUE, null, false);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue