This closes #492

This commit is contained in:
Clebert Suconic 2016-04-27 10:06:14 -04:00
commit 2135911eaa
4 changed files with 35 additions and 25 deletions

View File

@ -167,6 +167,10 @@ public class AMQConsumer {
} }
public void acquireCredit(int n) throws Exception { public void acquireCredit(int n) throws Exception {
if (messagePullHandler != null) {
//don't acquire any credits when the pull handler controls it!!
return;
}
int oldwindow = currentWindow.getAndAdd(n); int oldwindow = currentWindow.getAndAdd(n);
boolean promptDelivery = oldwindow < prefetchSize; boolean promptDelivery = oldwindow < prefetchSize;

View File

@ -65,7 +65,6 @@ import org.apache.activemq.artemis.utils.TypedProperties;
* Concrete implementation of a ClientConsumer. * Concrete implementation of a ClientConsumer.
*/ */
public class ServerConsumerImpl implements ServerConsumer, ReadyListener { public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
//private static final DebugLogger logger = DebugLogger.getLogger("redelivery.log");
// Constants ------------------------------------------------------------------------------------ // Constants ------------------------------------------------------------------------------------
private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();

View File

@ -23,8 +23,11 @@ import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -60,21 +63,23 @@ public class OptimizedAckTest extends TestSupport {
producer.send(session.createTextMessage("Hello" + i)); producer.send(session.createTextMessage("Hello" + i));
} }
final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
//check queue delivering count is 10
ArtemisBrokerWrapper broker = (ArtemisBrokerWrapper) ArtemisBrokerHelper.getBroker().getBroker();
Binding binding = broker.getServer().getPostOffice().getBinding(new SimpleString("jms.queue.test"));
assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { final QueueImpl coreQueue = (QueueImpl) binding.getBindable();
assertTrue("deliverying count is 10", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); return 10 == coreQueue.getDeliveringCount();
return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
} }
})); }));
for (int i = 0; i < 6; i++) { for (int i = 0; i < 6; i++) {
javax.jms.Message msg = consumer.receive(4000); javax.jms.Message msg = consumer.receive(4000);
assertNotNull(msg); assertNotNull(msg);
assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount()); assertEquals("all prefetch is still in flight: " + i, 10, coreQueue.getDeliveringCount());
} }
for (int i = 6; i < 10; i++) { for (int i = 6; i < 10; i++) {
@ -84,10 +89,11 @@ public class OptimizedAckTest extends TestSupport {
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); return 3 == coreQueue.getDeliveringCount();
} }
})); }));
} }
} }
public void testVerySlowReceivedMessageStillInflight() throws Exception { public void testVerySlowReceivedMessageStillInflight() throws Exception {
@ -98,15 +104,17 @@ public class OptimizedAckTest extends TestSupport {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("Hello" + i)); producer.send(session.createTextMessage("Hello" + i));
} }
final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
//check queue delivering count is 10
ArtemisBrokerWrapper broker = (ArtemisBrokerWrapper) ArtemisBrokerHelper.getBroker().getBroker();
Binding binding = broker.getServer().getPostOffice().getBinding(new SimpleString("jms.queue.test"));
final QueueImpl coreQueue = (QueueImpl) binding.getBindable();
assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); return 10 == coreQueue.getDeliveringCount();
return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
} }
})); }));
@ -114,7 +122,7 @@ public class OptimizedAckTest extends TestSupport {
Thread.sleep(400); Thread.sleep(400);
javax.jms.Message msg = consumer.receive(4000); javax.jms.Message msg = consumer.receive(4000);
assertNotNull(msg); assertNotNull(msg);
assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount()); assertEquals("all prefetch is still in flight: " + i, 10, coreQueue.getDeliveringCount());
} }
for (int i = 6; i < 10; i++) { for (int i = 6; i < 10; i++) {
@ -125,7 +133,7 @@ public class OptimizedAckTest extends TestSupport {
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); return 3 == coreQueue.getDeliveringCount();
} }
})); }));
} }
@ -142,21 +150,22 @@ public class OptimizedAckTest extends TestSupport {
producer.send(session.createTextMessage("Hello" + i)); producer.send(session.createTextMessage("Hello" + i));
} }
final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
ArtemisBrokerWrapper broker = (ArtemisBrokerWrapper) ArtemisBrokerHelper.getBroker().getBroker();
Binding binding = broker.getServer().getPostOffice().getBinding(new SimpleString("jms.queue.test"));
final QueueImpl coreQueue = (QueueImpl) binding.getBindable();
assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); return 10 == coreQueue.getDeliveringCount();
return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
} }
})); }));
for (int i = 0; i < 6; i++) { for (int i = 0; i < 6; i++) {
javax.jms.Message msg = consumer.receive(4000); javax.jms.Message msg = consumer.receive(4000);
assertNotNull(msg); assertNotNull(msg);
assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount()); assertEquals("all prefetch is still in flight: " + i, 10, coreQueue.getDeliveringCount());
} }
for (int i = 6; i < 10; i++) { for (int i = 6; i < 10; i++) {
@ -165,7 +174,7 @@ public class OptimizedAckTest extends TestSupport {
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); return 3 == coreQueue.getDeliveringCount();
} }
})); }));
} }
@ -173,8 +182,8 @@ public class OptimizedAckTest extends TestSupport {
assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition() { assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); LOG.info("inflight count: " + coreQueue.getDeliveringCount());
return 0 == regionBroker.getDestinationStatistics().getInflight().getCount(); return 0 == coreQueue.getDeliveringCount();
} }
})); }));
} }

View File

@ -47,9 +47,6 @@ public class InitalReconnectDelayTest extends OpenwireArtemisBaseTest {
protected EmbeddedJMS server1; protected EmbeddedJMS server1;
protected EmbeddedJMS server2; protected EmbeddedJMS server2;
// protected BrokerService broker1;
// protected BrokerService broker2;
@Test @Test
public void testInitialReconnectDelay() throws Exception { public void testInitialReconnectDelay() throws Exception {
@ -82,6 +79,7 @@ public class InitalReconnectDelayTest extends OpenwireArtemisBaseTest {
//Inital reconnection should kick in and be darned close to what we expected //Inital reconnection should kick in and be darned close to what we expected
LOG.info("Failover took " + (end - start) + " ms."); LOG.info("Failover took " + (end - start) + " ms.");
assertTrue("Failover took " + (end - start) + " ms and should be > 14000.", (end - start) > 14000); assertTrue("Failover took " + (end - start) + " ms and should be > 14000.", (end - start) > 14000);
connection.close();
} }
@Test @Test