This closes #514 ARTEMIS-46 Adds AMQP Drain Support

This commit is contained in:
Andy Taylor 2016-05-11 12:09:53 +01:00
commit 0e0dec6649
17 changed files with 176 additions and 35 deletions

View File

@ -17,11 +17,13 @@
package org.apache.activemq.artemis.core.protocol.proton.plug;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@ -70,6 +72,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
private final Executor closeExecutor;
private final AtomicBoolean draining = new AtomicBoolean(false);
public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI,
ProtonProtocolManager manager,
AMQPConnectionContext connection,
@ -88,9 +92,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
@Override
public void onFlowConsumer(Object consumer, int credits) {
// We have our own flow control on AMQP, so we set activemq's flow control to 0
((ServerConsumer) consumer).receiveCredits(-1);
public void onFlowConsumer(Object consumer, int credits, final boolean drain) {
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
if (drain) {
// If the draining is already running, then don't do anything
if (draining.compareAndSet(false, true)) {
final ProtonPlugSender plugSender = (ProtonPlugSender) serverConsumer.getProtocolContext();
serverConsumer.forceDelivery(1, new Runnable() {
@Override
public void run() {
try {
plugSender.getSender().drained();
}
finally {
draining.set(false);
}
}
});
}
}
else {
serverConsumer.receiveCredits(-1);
}
}
@Override

View File

@ -110,6 +110,23 @@
</dependencies>
<!-- We use the proton plug test classes in some of the Artemis Integration tests -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.6</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<packaging>bundle</packaging>
</project>

View File

@ -25,4 +25,10 @@ public interface AMQPClientReceiverContext {
ProtonJMessage receiveMessage(int time, TimeUnit unit) throws Exception;
void flow(int credits);
void drain(int i);
int drained();
boolean isDraining();
}

View File

@ -32,7 +32,7 @@ public interface AMQPSessionCallback {
void start();
void onFlowConsumer(Object consumer, int credits);
void onFlowConsumer(Object consumer, int credits, boolean drain);
Object createSender(ProtonPlugSender protonSender, String queue, String filer, boolean browserOnly) throws Exception;

View File

@ -70,6 +70,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
connectionCallback.setConnection(this);
this.handler = ProtonHandler.Factory.create(dispatchExecutor);
Transport transport = handler.getTransport();
transport.setEmitFlowEventOnSend(false);
if (idleTimeout > 0) {
transport.setIdleTimeout(idleTimeout);
}
@ -256,7 +257,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
@Override
public void onFlow(Link link) throws Exception {
((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit());
((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain());
}
@Override

View File

@ -51,7 +51,7 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im
}
@Override
public void onFlow(int credits) {
public void onFlow(int credits, boolean drain) {
this.creditsSemaphore.setCredits(credits);
}

View File

@ -69,4 +69,20 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
}
connection.flush();
}
public void drain(int credits) {
synchronized (connection.getLock()) {
receiver.drain(credits);
}
connection.flush();
}
public int drained() {
return receiver.drained();
}
public boolean isDraining() {
return receiver.draining();
}
}

View File

@ -25,7 +25,7 @@ import org.proton.plug.exceptions.ActiveMQAMQPException;
*/
public interface ProtonDeliveryHandler {
void onFlow(int currentCredits);
void onFlow(int currentCredits, boolean drain);
void onMessage(Delivery delivery) throws ActiveMQAMQPException;

View File

@ -111,7 +111,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
}
@Override
public void onFlow(int credits) {
public void onFlow(int credits, boolean drain) {
}

View File

@ -46,7 +46,7 @@ public class ProtonClientReceiverContext extends AbstractProtonReceiverContext i
}
@Override
public void onFlow(int credits) {
public void onFlow(int credits, boolean drain) {
}
LinkedBlockingDeque<MessageImpl> queues = new LinkedBlockingDeque<>();
@ -83,4 +83,5 @@ public class ProtonClientReceiverContext extends AbstractProtonReceiverContext i
public ProtonJMessage receiveMessage(int time, TimeUnit unit) throws Exception {
return queues.poll(time, unit);
}
}

View File

@ -46,7 +46,7 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
}
@Override
public void onFlow(int credits) {
public void onFlow(int credits, boolean drain) {
}
@Override

View File

@ -65,9 +65,9 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
}
@Override
public void onFlow(int currentCredits) {
super.onFlow(currentCredits);
sessionSPI.onFlowConsumer(brokerConsumer, currentCredits);
public void onFlow(int currentCredits, boolean drain) {
super.onFlow(currentCredits, drain);
sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
}
/*

View File

@ -36,19 +36,19 @@ import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.proton.plug.AMQPClientConnectionContext;
import org.proton.plug.AMQPClientSenderContext;
import org.proton.plug.AMQPClientSessionContext;
import org.proton.plug.sasl.ClientSASLPlain;
import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
import org.proton.plug.test.minimalserver.DumbServer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.proton.plug.util.ByteUtil;
/**

View File

@ -71,7 +71,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
}
@Override
public void onFlowConsumer(Object consumer, int credits) {
public void onFlowConsumer(Object consumer, int credits, boolean drain) {
}
@Override

View File

@ -510,7 +510,21 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
* there are no other messages to be delivered.
*/
@Override
public synchronized void forceDelivery(final long sequence) {
public void forceDelivery(final long sequence) {
forceDelivery(sequence, new Runnable() {
@Override
public void run() {
ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50);
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
forcedDeliveryMessage.setAddress(messageQueue.getName());
callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
}
});
}
public synchronized void forceDelivery(final long sequence, final Runnable r) {
promptDelivery();
// JBPAPP-6030 - Using the executor to avoid distributed dead locks
@ -527,17 +541,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
messageQueue.getExecutor().execute(new Runnable() {
@Override
public void run() {
forceDelivery(sequence);
forceDelivery(sequence, r);
}
});
}
else {
ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50);
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
forcedDeliveryMessage.setAddress(messageQueue.getName());
callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
r.run();
}
}
}
@ -546,7 +555,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
}
});
}
@Override

View File

@ -122,6 +122,12 @@
<artifactId>artemis-amqp-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-proton-plug</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-stomp-protocol</artifactId>

View File

@ -40,6 +40,7 @@ import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -50,12 +51,18 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.message.ProtonJMessage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.proton.plug.AMQPClientConnectionContext;
import org.proton.plug.AMQPClientReceiverContext;
import org.proton.plug.AMQPClientSessionContext;
import org.proton.plug.test.Constants;
import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
@RunWith(Parameterized.class)
public class ProtonTest extends ActiveMQTestBase {
@ -214,10 +221,8 @@ public class ProtonTest extends ActiveMQTestBase {
/*
// Uncomment testLoopBrowser to validate the hunging on the test
@Test
public void testLoopBrowser() throws Throwable
{
for (int i = 0 ; i < 1000; i++)
{
public void testLoopBrowser() throws Throwable {
for (int i = 0 ; i < 1000; i++) {
System.out.println("#test " + i);
testBrowser();
tearDown();
@ -230,7 +235,7 @@ public class ProtonTest extends ActiveMQTestBase {
*
* @throws Throwable
*/
// @Test TODO: re-enable this when we can get a version free of QPID-4901 bug
//@Test // TODO: re-enable this when we can get a version free of QPID-4901 bug
public void testBrowser() throws Throwable {
boolean success = false;
@ -272,7 +277,7 @@ public class ProtonTest extends ActiveMQTestBase {
connection.close();
Assert.assertEquals(getMessageCount(q), numMessages);
}
}, 1000);
}, 5000);
if (success) {
break;
@ -289,6 +294,64 @@ public class ProtonTest extends ActiveMQTestBase {
Assert.assertTrue("Test had to interrupt on all occasions.. this is beyond the expected for the test", success);
}
@Test
public void testReceiveImmediate() throws Exception {
testReceiveImmediate(1000, 1000);
}
@Test
public void testReceiveImmediateMoreCredits() throws Exception {
testReceiveImmediate(1000, 100);
}
@Test
public void testReceiveImmediateMoreMessages() throws Exception {
testReceiveImmediate(100, 1000);
}
public void testReceiveImmediate(int noCredits, int noMessages) throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = createQueue(address);
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();
message.setText("Message temporary");
for (int i = 0; i < noMessages; i++) {
message.setText("msg:" + i);
p.send(message);
}
SimpleAMQPConnector connector = new SimpleAMQPConnector();
connector.start();
AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT);
clientConnection.clientOpen(null);
AMQPClientSessionContext csession = clientConnection.createClientSession();
AMQPClientReceiverContext receiver = csession.createReceiver(address);
receiver.drain(noCredits);
int expectedNumberMessages = noCredits > noMessages ? noMessages : noCredits;
for (int i = 0; i < expectedNumberMessages; i++) {
ProtonJMessage protonJMessage = receiver.receiveMessage(500, TimeUnit.SECONDS);
Assert.assertNotNull(protonJMessage);
}
ProtonJMessage protonJMessage = receiver.receiveMessage(500, TimeUnit.MILLISECONDS);
Assert.assertNull(protonJMessage);
assertFalse(receiver.isDraining());
if (noCredits >= noMessages) {
assertEquals(noCredits - noMessages, receiver.drained());
}
else {
assertEquals(0, receiver.drained());
}
}
@Test
public void testConnection() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);