This closes #1020
This commit is contained in:
commit
57fd708dc9
|
@ -56,8 +56,8 @@ public class SessionReceiveMessage extends MessagePacket {
|
|||
public ActiveMQBuffer encode(final RemotingConnection connection) {
|
||||
ActiveMQBuffer buffer = message.getEncodedBuffer();
|
||||
|
||||
ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex(), true);
|
||||
bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity());
|
||||
ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, true);
|
||||
bufferWrite.writeBytes(buffer, 0, buffer.capacity());
|
||||
bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
|
||||
|
||||
// Sanity check
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
|
@ -34,6 +35,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
|
||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
|
@ -41,6 +43,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -95,6 +98,50 @@ public class ConsumerTest extends ActiveMQTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleSend() throws Throwable {
|
||||
receive(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleSendWithCloseConsumer() throws Throwable {
|
||||
receive(true);
|
||||
}
|
||||
|
||||
private void receive(boolean cancelOnce) throws Throwable {
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession session = sf.createSession(false, true, true, false);
|
||||
|
||||
session.createQueue(QUEUE, QUEUE, null, false);
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(QUEUE);
|
||||
|
||||
ClientProducer producer = session.createProducer(QUEUE);
|
||||
ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 0, System.currentTimeMillis(), (byte) 4);
|
||||
message.getBodyBuffer().writeString("hi");
|
||||
message.putStringProperty("hello", "elo");
|
||||
producer.send(message);
|
||||
|
||||
session.start();
|
||||
|
||||
if (cancelOnce) {
|
||||
final ClientConsumerInternal consumerInternal = (ClientConsumerInternal)consumer;
|
||||
Wait.waitFor(() -> consumerInternal.getBufferSize() > 0);
|
||||
consumer.close();
|
||||
consumer = session.createConsumer(QUEUE);
|
||||
}
|
||||
ClientMessage message2 = consumer.receive(1000);
|
||||
|
||||
System.out.println("Id::" + message2.getMessageID());
|
||||
|
||||
System.out.println("Received " + message2);
|
||||
|
||||
session.close();
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testConsumerAckImmediateAutoCommitTrue() throws Exception {
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
|
@ -323,6 +370,7 @@ public class ConsumerTest extends ActiveMQTestBase {
|
|||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
session.start();
|
||||
|
||||
session.createQueue(QUEUE, QUEUE, null, false);
|
||||
|
||||
|
|
Loading…
Reference in New Issue