diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 01820d6a8f..e2deb802be 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -167,6 +167,10 @@ public class AMQConsumer { } public void acquireCredit(int n) throws Exception { + if (messagePullHandler != null) { + //don't acquire any credits when the pull handler controls it!! + return; + } int oldwindow = currentWindow.getAndAdd(n); boolean promptDelivery = oldwindow < prefetchSize; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index fac4cf3a27..7acc6f6d93 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -65,7 +65,6 @@ import org.apache.activemq.artemis.utils.TypedProperties; * Concrete implementation of a ClientConsumer. */ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { - //private static final DebugLogger logger = DebugLogger.getLogger("redelivery.log"); // Constants ------------------------------------------------------------------------------------ private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java index cd50295716..8da4cf7dbb 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java @@ -23,8 +23,11 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; -import org.apache.activemq.broker.BrokerRegistry; -import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; +import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper; import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,21 +63,23 @@ public class OptimizedAckTest extends TestSupport { producer.send(session.createTextMessage("Hello" + i)); } - final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); MessageConsumer consumer = session.createConsumer(queue); + //check queue delivering count is 10 + ArtemisBrokerWrapper broker = (ArtemisBrokerWrapper) ArtemisBrokerHelper.getBroker().getBroker(); + Binding binding = broker.getServer().getPostOffice().getBinding(new SimpleString("jms.queue.test")); - assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { + final QueueImpl coreQueue = (QueueImpl) binding.getBindable(); + assertTrue("deliverying count is 10", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); - return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); + return 10 == coreQueue.getDeliveringCount(); } })); for (int i = 0; i < 6; i++) { javax.jms.Message msg = consumer.receive(4000); assertNotNull(msg); - assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount()); + assertEquals("all prefetch is still in flight: " + i, 10, coreQueue.getDeliveringCount()); } for (int i = 6; i < 10; i++) { @@ -84,10 +89,11 @@ public class OptimizedAckTest extends TestSupport { assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); + return 3 == coreQueue.getDeliveringCount(); } })); } + } public void testVerySlowReceivedMessageStillInflight() throws Exception { @@ -98,15 +104,17 @@ public class OptimizedAckTest extends TestSupport { for (int i = 0; i < 10; i++) { producer.send(session.createTextMessage("Hello" + i)); } - - final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); MessageConsumer consumer = session.createConsumer(queue); + //check queue delivering count is 10 + ArtemisBrokerWrapper broker = (ArtemisBrokerWrapper) ArtemisBrokerHelper.getBroker().getBroker(); + Binding binding = broker.getServer().getPostOffice().getBinding(new SimpleString("jms.queue.test")); + + final QueueImpl coreQueue = (QueueImpl) binding.getBindable(); assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); - return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); + return 10 == coreQueue.getDeliveringCount(); } })); @@ -114,7 +122,7 @@ public class OptimizedAckTest extends TestSupport { Thread.sleep(400); javax.jms.Message msg = consumer.receive(4000); assertNotNull(msg); - assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount()); + assertEquals("all prefetch is still in flight: " + i, 10, coreQueue.getDeliveringCount()); } for (int i = 6; i < 10; i++) { @@ -125,7 +133,7 @@ public class OptimizedAckTest extends TestSupport { assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); + return 3 == coreQueue.getDeliveringCount(); } })); } @@ -142,21 +150,22 @@ public class OptimizedAckTest extends TestSupport { producer.send(session.createTextMessage("Hello" + i)); } - final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); MessageConsumer consumer = session.createConsumer(queue); + ArtemisBrokerWrapper broker = (ArtemisBrokerWrapper) ArtemisBrokerHelper.getBroker().getBroker(); + Binding binding = broker.getServer().getPostOffice().getBinding(new SimpleString("jms.queue.test")); + final QueueImpl coreQueue = (QueueImpl) binding.getBindable(); assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); - return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); + return 10 == coreQueue.getDeliveringCount(); } })); for (int i = 0; i < 6; i++) { javax.jms.Message msg = consumer.receive(4000); assertNotNull(msg); - assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount()); + assertEquals("all prefetch is still in flight: " + i, 10, coreQueue.getDeliveringCount()); } for (int i = 6; i < 10; i++) { @@ -165,7 +174,7 @@ public class OptimizedAckTest extends TestSupport { assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); + return 3 == coreQueue.getDeliveringCount(); } })); } @@ -173,8 +182,8 @@ public class OptimizedAckTest extends TestSupport { assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); - return 0 == regionBroker.getDestinationStatistics().getInflight().getCount(); + LOG.info("inflight count: " + coreQueue.getDeliveringCount()); + return 0 == coreQueue.getDeliveringCount(); } })); } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java index dad241c85c..763a020e5c 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java @@ -47,9 +47,6 @@ public class InitalReconnectDelayTest extends OpenwireArtemisBaseTest { protected EmbeddedJMS server1; protected EmbeddedJMS server2; -// protected BrokerService broker1; -// protected BrokerService broker2; - @Test public void testInitialReconnectDelay() throws Exception { @@ -82,6 +79,7 @@ public class InitalReconnectDelayTest extends OpenwireArtemisBaseTest { //Inital reconnection should kick in and be darned close to what we expected LOG.info("Failover took " + (end - start) + " ms."); assertTrue("Failover took " + (end - start) + " ms and should be > 14000.", (end - start) > 14000); + connection.close(); } @Test