diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 421a3821d0..2350f9d4e8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -17,11 +17,13 @@ package org.apache.activemq.artemis.core.protocol.proton.plug; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; @@ -70,6 +72,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se private final Executor closeExecutor; + private final AtomicBoolean draining = new AtomicBoolean(false); + public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection, @@ -88,9 +92,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se } @Override - public void onFlowConsumer(Object consumer, int credits) { - // We have our own flow control on AMQP, so we set activemq's flow control to 0 - ((ServerConsumer) consumer).receiveCredits(-1); + public void onFlowConsumer(Object consumer, int credits, final boolean drain) { + ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; + if (drain) { + // If the draining is already running, then don't do anything + if (draining.compareAndSet(false, true)) { + final ProtonPlugSender plugSender = (ProtonPlugSender) serverConsumer.getProtocolContext(); + serverConsumer.forceDelivery(1, new Runnable() { + @Override + public void run() { + try { + plugSender.getSender().drained(); + } + finally { + draining.set(false); + } + } + }); + } + } + else { + serverConsumer.receiveCredits(-1); + } } @Override diff --git a/artemis-protocols/artemis-proton-plug/pom.xml b/artemis-protocols/artemis-proton-plug/pom.xml index b23e08f715..c667e45b8d 100644 --- a/artemis-protocols/artemis-proton-plug/pom.xml +++ b/artemis-protocols/artemis-proton-plug/pom.xml @@ -110,6 +110,23 @@ + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.6 + + + + test-jar + + + + + + bundle diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java index 26d539e1b8..514ee195fa 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java @@ -25,4 +25,10 @@ public interface AMQPClientReceiverContext { ProtonJMessage receiveMessage(int time, TimeUnit unit) throws Exception; void flow(int credits); + + void drain(int i); + + int drained(); + + boolean isDraining(); } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java index cce8e0caae..0c0dbe03ba 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java @@ -32,7 +32,7 @@ public interface AMQPSessionCallback { void start(); - void onFlowConsumer(Object consumer, int credits); + void onFlowConsumer(Object consumer, int credits, boolean drain); Object createSender(ProtonPlugSender protonSender, String queue, String filer, boolean browserOnly) throws Exception; diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java index 93442de98e..262dc2a198 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java @@ -70,6 +70,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl connectionCallback.setConnection(this); this.handler = ProtonHandler.Factory.create(dispatchExecutor); Transport transport = handler.getTransport(); + transport.setEmitFlowEventOnSend(false); if (idleTimeout > 0) { transport.setIdleTimeout(idleTimeout); } @@ -256,7 +257,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl @Override public void onFlow(Link link) throws Exception { - ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit()); + ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain()); } @Override diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java index 7a4d295340..6b209b83b2 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java @@ -51,7 +51,7 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im } @Override - public void onFlow(int credits) { + public void onFlow(int credits, boolean drain) { this.creditsSemaphore.setCredits(credits); } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java index 8481853a34..4286140b0f 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java @@ -69,4 +69,20 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable } connection.flush(); } + + + public void drain(int credits) { + synchronized (connection.getLock()) { + receiver.drain(credits); + } + connection.flush(); + } + + public int drained() { + return receiver.drained(); + } + + public boolean isDraining() { + return receiver.draining(); + } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java index 128ea6564a..ad7ff4f307 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java @@ -25,7 +25,7 @@ import org.proton.plug.exceptions.ActiveMQAMQPException; */ public interface ProtonDeliveryHandler { - void onFlow(int currentCredits); + void onFlow(int currentCredits, boolean drain); void onMessage(Delivery delivery) throws ActiveMQAMQPException; diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java index 6a9ad6ae45..1b32b32e9a 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java @@ -111,7 +111,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { } @Override - public void onFlow(int credits) { + public void onFlow(int credits, boolean drain) { } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java index ca8dc98860..884af60a38 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java @@ -46,7 +46,7 @@ public class ProtonClientReceiverContext extends AbstractProtonReceiverContext i } @Override - public void onFlow(int credits) { + public void onFlow(int credits, boolean drain) { } LinkedBlockingDeque queues = new LinkedBlockingDeque<>(); @@ -83,4 +83,5 @@ public class ProtonClientReceiverContext extends AbstractProtonReceiverContext i public ProtonJMessage receiveMessage(int time, TimeUnit unit) throws Exception { return queues.poll(time, unit); } + } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java index 04069198bd..d0f798a52e 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java @@ -46,7 +46,7 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext { } @Override - public void onFlow(int credits) { + public void onFlow(int credits, boolean drain) { } @Override diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java index dfc69dfadd..ae1caa4933 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java @@ -65,9 +65,9 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple } @Override - public void onFlow(int currentCredits) { - super.onFlow(currentCredits); - sessionSPI.onFlowConsumer(brokerConsumer, currentCredits); + public void onFlow(int currentCredits, boolean drain) { + super.onFlow(currentCredits, drain); + sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain); } /* diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java index acbb697b75..4c3aaf4e42 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java @@ -36,19 +36,19 @@ import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.proton.plug.AMQPClientConnectionContext; import org.proton.plug.AMQPClientSenderContext; import org.proton.plug.AMQPClientSessionContext; import org.proton.plug.sasl.ClientSASLPlain; import org.proton.plug.test.minimalclient.SimpleAMQPConnector; import org.proton.plug.test.minimalserver.DumbServer; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.proton.plug.util.ByteUtil; /** diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java index c702957afe..3578926c50 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java @@ -71,7 +71,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback { } @Override - public void onFlowConsumer(Object consumer, int credits) { + public void onFlowConsumer(Object consumer, int credits, boolean drain) { } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 859b57ddcc..0224c7d1c6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -510,7 +510,21 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { * there are no other messages to be delivered. */ @Override - public synchronized void forceDelivery(final long sequence) { + public void forceDelivery(final long sequence) { + forceDelivery(sequence, new Runnable() { + @Override + public void run() { + ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50); + + forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); + forcedDeliveryMessage.setAddress(messageQueue.getName()); + + callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); + } + }); + } + + public synchronized void forceDelivery(final long sequence, final Runnable r) { promptDelivery(); // JBPAPP-6030 - Using the executor to avoid distributed dead locks @@ -527,17 +541,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { messageQueue.getExecutor().execute(new Runnable() { @Override public void run() { - forceDelivery(sequence); + forceDelivery(sequence, r); } }); } else { - ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50); - - forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); - forcedDeliveryMessage.setAddress(messageQueue.getName()); - - callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); + r.run(); } } } @@ -546,7 +555,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } }); - } @Override diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index f0e1d14643..38ac4c2cda 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -122,6 +122,12 @@ artemis-amqp-protocol ${project.version} + + org.apache.activemq + artemis-proton-plug + ${project.version} + test-jar + org.apache.activemq artemis-stomp-protocol diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index efd5a85ee1..9534681536 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -40,6 +40,7 @@ import java.util.Collection; import java.util.Enumeration; import java.util.HashMap; import java.util.Random; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -50,12 +51,18 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.proton.message.ProtonJMessage; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.proton.plug.AMQPClientConnectionContext; +import org.proton.plug.AMQPClientReceiverContext; +import org.proton.plug.AMQPClientSessionContext; +import org.proton.plug.test.Constants; +import org.proton.plug.test.minimalclient.SimpleAMQPConnector; @RunWith(Parameterized.class) public class ProtonTest extends ActiveMQTestBase { @@ -214,10 +221,8 @@ public class ProtonTest extends ActiveMQTestBase { /* // Uncomment testLoopBrowser to validate the hunging on the test @Test - public void testLoopBrowser() throws Throwable - { - for (int i = 0 ; i < 1000; i++) - { + public void testLoopBrowser() throws Throwable { + for (int i = 0 ; i < 1000; i++) { System.out.println("#test " + i); testBrowser(); tearDown(); @@ -230,7 +235,7 @@ public class ProtonTest extends ActiveMQTestBase { * * @throws Throwable */ - // @Test TODO: re-enable this when we can get a version free of QPID-4901 bug + //@Test // TODO: re-enable this when we can get a version free of QPID-4901 bug public void testBrowser() throws Throwable { boolean success = false; @@ -272,7 +277,7 @@ public class ProtonTest extends ActiveMQTestBase { connection.close(); Assert.assertEquals(getMessageCount(q), numMessages); } - }, 1000); + }, 5000); if (success) { break; @@ -289,6 +294,64 @@ public class ProtonTest extends ActiveMQTestBase { Assert.assertTrue("Test had to interrupt on all occasions.. this is beyond the expected for the test", success); } + @Test + public void testReceiveImmediate() throws Exception { + testReceiveImmediate(1000, 1000); + } + + @Test + public void testReceiveImmediateMoreCredits() throws Exception { + testReceiveImmediate(1000, 100); + } + + @Test + public void testReceiveImmediateMoreMessages() throws Exception { + testReceiveImmediate(100, 1000); + } + + public void testReceiveImmediate(int noCredits, int noMessages) throws Exception { + + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = createQueue(address); + MessageProducer p = session.createProducer(queue); + + TextMessage message = session.createTextMessage(); + message.setText("Message temporary"); + for (int i = 0; i < noMessages; i++) { + message.setText("msg:" + i); + p.send(message); + } + + SimpleAMQPConnector connector = new SimpleAMQPConnector(); + connector.start(); + AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT); + + clientConnection.clientOpen(null); + + AMQPClientSessionContext csession = clientConnection.createClientSession(); + AMQPClientReceiverContext receiver = csession.createReceiver(address); + receiver.drain(noCredits); + + int expectedNumberMessages = noCredits > noMessages ? noMessages : noCredits; + for (int i = 0; i < expectedNumberMessages; i++) { + ProtonJMessage protonJMessage = receiver.receiveMessage(500, TimeUnit.SECONDS); + Assert.assertNotNull(protonJMessage); + } + ProtonJMessage protonJMessage = receiver.receiveMessage(500, TimeUnit.MILLISECONDS); + Assert.assertNull(protonJMessage); + + assertFalse(receiver.isDraining()); + if (noCredits >= noMessages) { + assertEquals(noCredits - noMessages, receiver.drained()); + } + else { + assertEquals(0, receiver.drained()); + } + } + + @Test public void testConnection() throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);