add verification of blocking via full advisory - odd that advisory arrives late, hmm

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1392945 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-10-02 14:55:03 +00:00
parent c83b70c3cd
commit a625f7ab94
1 changed files with 37 additions and 6 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.usecases; package org.apache.activemq.usecases;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection; import javax.jms.Connection;
@ -29,6 +30,7 @@ import javax.jms.Session;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
@ -41,7 +43,7 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
private static final Logger LOG = LoggerFactory.getLogger(TopicProducerFlowControlTest.class); private static final Logger LOG = LoggerFactory.getLogger(TopicProducerFlowControlTest.class);
private static final String brokerName = "testBroker"; private static final String brokerName = "testBroker";
private static final String brokerUrl = "vm://" + brokerName; private static final String brokerUrl = "vm://" + brokerName;
private static final int destinationMemLimit = 2097152; // 2MB protected static final int destinationMemLimit = 2097152; // 2MB
private static final AtomicLong produced = new AtomicLong(); private static final AtomicLong produced = new AtomicLong();
private static final AtomicLong consumed = new AtomicLong(); private static final AtomicLong consumed = new AtomicLong();
private static final int numMessagesToSend = 50000; private static final int numMessagesToSend = 50000;
@ -66,6 +68,7 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
tpe.setTopic(">"); tpe.setTopic(">");
tpe.setMemoryLimit(destinationMemLimit); tpe.setMemoryLimit(destinationMemLimit);
tpe.setProducerFlowControl(true); tpe.setProducerFlowControl(true);
tpe.setAdvisoryWhenFull(true);
// Setup the topic destination policy // Setup the topic destination policy
PolicyEntry qpe = new PolicyEntry(); PolicyEntry qpe = new PolicyEntry();
@ -73,23 +76,27 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
qpe.setMemoryLimit(destinationMemLimit); qpe.setMemoryLimit(destinationMemLimit);
qpe.setProducerFlowControl(true); qpe.setProducerFlowControl(true);
qpe.setQueuePrefetch(1); qpe.setQueuePrefetch(1);
qpe.setAdvisoryWhenFull(true);
pm.setPolicyEntries(Arrays.asList(new PolicyEntry[]{tpe, qpe})); pm.setPolicyEntries(Arrays.asList(new PolicyEntry[]{tpe, qpe}));
broker.setDestinationPolicy(pm); setDestinationPolicy(broker, pm);
// Start the broker // Start the broker
broker.start(); broker.start();
broker.waitUntilStarted(); broker.waitUntilStarted();
} }
protected void setDestinationPolicy(BrokerService broker, PolicyMap pm) {
broker.setDestinationPolicy(pm);
}
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
broker.stop(); broker.stop();
broker.waitUntilStopped(); broker.waitUntilStopped();
} }
public void testTopicProducerFlowControl() throws Exception { public void testTopicProducerFlowControl() throws Exception {
Destination destination = new ActiveMQTopic("test");
// Create the connection factory // Create the connection factory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
@ -99,7 +106,18 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
// Start the test destination listener // Start the test destination listener
Connection c = connectionFactory.createConnection(); Connection c = connectionFactory.createConnection();
c.start(); c.start();
c.createSession(false, 1).createConsumer(destination).setMessageListener(new TopicProducerFlowControlTest()); Session listenerSession = c.createSession(false, 1);
Destination destination = createDestination(listenerSession);
listenerSession.createConsumer(destination).setMessageListener(new TopicProducerFlowControlTest());
final AtomicInteger blockedCounter = new AtomicInteger(0);
listenerSession.createConsumer(new ActiveMQTopic(AdvisorySupport.FULL_TOPIC_PREFIX + ">")).setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
LOG.info("Got full advisory, blockedCounter: " + blockedCounter.get());
blockedCounter.incrementAndGet();
}
});
// Start producing the test messages // Start producing the test messages
final Session session = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); final Session session = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -112,7 +130,7 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
producer.send(session.createTextMessage("test")); producer.send(session.createTextMessage("test"));
long count = produced.incrementAndGet(); long count = produced.incrementAndGet();
if (count % 100 == 0) { if (count % 10000 == 0) {
LOG.info("Produced " + count + " messages"); LOG.info("Produced " + count + " messages");
} }
} }
@ -138,17 +156,30 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
assertEquals("Didn't produce all messages", numMessagesToSend, produced.get()); assertEquals("Didn't produce all messages", numMessagesToSend, produced.get());
assertEquals("Didn't consume all messages", numMessagesToSend, consumed.get()); assertEquals("Didn't consume all messages", numMessagesToSend, consumed.get());
assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return blockedCounter.get() > 0;
}
}, 5 * 1000));
}
protected Destination createDestination(Session listenerSession) throws Exception {
return new ActiveMQTopic("test");
} }
@Override @Override
public void onMessage(Message message) { public void onMessage(Message message) {
long count = consumed.incrementAndGet(); long count = consumed.incrementAndGet();
if (count % 100 == 0) { if (count % 100 == 0) {
LOG.info("\tConsumed " + count + " messages");
try { try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
} }
if (count % 10000 == 0) {
LOG.info("\tConsumed " + count + " messages");
}
} }
} }