This commit is contained in:
Clebert Suconic 2018-06-18 14:53:53 -04:00
commit ad4844e8ec
3 changed files with 111 additions and 3 deletions

View File

@ -689,6 +689,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
// Let the Message decide how to present the message bytes
boolean attemptRelease = true;
ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount);
try {
@ -712,7 +713,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
delivery.setMessageFormat((int) message.getMessageFormat());
delivery.setContext(messageReference);
sender.sendNoCopy(sendBuffer);
if (sendBuffer instanceof NettyReadable) {
sender.send(sendBuffer);
// Above send copied, so release now if needed
attemptRelease = false;
((NettyReadable) sendBuffer).getByteBuf().release();
} else {
// Don't have pooled content, no need to release or copy.
attemptRelease = false;
sender.sendNoCopy(sendBuffer);
}
if (preSettle) {
// Presettled means the client implicitly accepts any delivery we send it.
@ -728,7 +738,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return size;
} finally {
if (sendBuffer instanceof NettyReadable) {
if (attemptRelease && sendBuffer instanceof NettyReadable) {
((NettyReadable) sendBuffer).getByteBuf().release();
}
}

View File

@ -49,6 +49,7 @@ import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Event.Type;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
import org.apache.qpid.proton.engine.impl.TransportImpl;
@ -105,6 +106,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private boolean authenticated;
private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private int channelMax = DEFAULT_CHANNEL_MAX;
private int sessionIncomingCapacity = 0;
private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
@ -279,7 +281,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
@Override
public void run() {
checkClosed();
session.setEndpoint(getEndpoint().session());
Session protonSession = getEndpoint().session();
protonSession.setIncomingCapacity(sessionIncomingCapacity);
session.setEndpoint(protonSession);
session.setStateInspector(getStateInspector());
session.open(request);
pumpToProtonTransport(request);
@ -383,6 +387,14 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
this.channelMax = channelMax;
}
public int getSessionIncomingCapacity() {
return sessionIncomingCapacity;
}
public void setSessionIncomingCapacity(int capacity) {
this.sessionIncomingCapacity = capacity;
}
public long getConnectTimeout() {
return connectTimeout;
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.amqp;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
@ -39,6 +40,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -46,7 +48,10 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Ignore;
@ -65,6 +70,13 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
String testQueueName = "ConnectionFrameSize";
@Override
protected void addConfiguration(ActiveMQServer server) {
// Make the journal file size larger than the frame+message sizes used in the tests,
// since it is by default for external brokers and it changes the behaviour.
server.getConfiguration().setJournalFileSize(5 * 1024 * 1024);
}
@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
params.put("maxFrameSize", FRAME_SIZE);
@ -325,6 +337,80 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
}
}
@Test(timeout = 60000)
public void testReceiveRedeliveredLargeMessagesWithSessionFlowControl() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
int numMsgs = 10;
int msgSize = 2_000_000;
int maxFrameSize = FRAME_SIZE; // Match the brokers outgoing frame size limit to make window sizing easy
int sessionCapacity = 2_500_000; // Restrict session to 1.x messages in flight at once, make it likely send is partial.
byte[] payload = createLargePayload(msgSize);
assertEquals(msgSize, payload.length);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
connection.setMaxFrameSize(maxFrameSize);
connection.setSessionIncomingCapacity(sessionCapacity);
connection.connect();
addConnection(connection);
try {
String testQueueName = getTestName();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(testQueueName);
for (int i = 0; i < numMsgs; ++i) {
AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
sender.send(message);
}
Wait.assertEquals(numMsgs, () -> getMessageCount(server.getPostOffice(), testQueueName), 5000, 10);
AmqpReceiver receiver = session.createReceiver(testQueueName);
receiver.flow(numMsgs);
ArrayList<AmqpMessage> messages = new ArrayList<>();
for (int i = 0; i < numMsgs; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("failed at " + i, message);
messages.add(message);
}
for (int i = 0; i < numMsgs; ++i) {
AmqpMessage msg = messages.get(i);
msg.modified(true, false);
}
receiver.close();
AmqpReceiver receiver2 = session.createReceiver(testQueueName);
receiver2.flow(numMsgs);
for (int i = 0; i < numMsgs; ++i) {
AmqpMessage message = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull("failed at " + i, message);
Section body = message.getWrappedMessage().getBody();
assertNotNull("No message body for msg " + i, body);
//TODO: ARTEMIS-1941 raised. This is wrong, test sent a Data section, it got converted in transit.
assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof AmqpValue);
assertEquals("Unexpected body content for msg", new Binary(payload, 0, payload.length), ((AmqpValue) body).getValue());
message.accept();
}
session.close();
} finally {
connection.close();
}
}
private void sendObjectMessages(int nMsgs, ConnectionFactory factory) throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();