mirror of https://github.com/apache/activemq.git
Remove need for hard sleep
This commit is contained in:
parent
0683d8be94
commit
709b64b3d2
|
@ -16,9 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.bugs;
|
package org.apache.activemq.bugs;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
|
@ -30,13 +33,14 @@ import javax.management.ObjectName;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.jmx.BrokerView;
|
import org.apache.activemq.broker.jmx.BrokerViewMBean;
|
||||||
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
|
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
|
||||||
import org.apache.activemq.broker.region.policy.FilePendingDurableSubscriberMessageStoragePolicy;
|
import org.apache.activemq.broker.region.policy.FilePendingDurableSubscriberMessageStoragePolicy;
|
||||||
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
|
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
|
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
|
||||||
|
import org.apache.activemq.util.Wait;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -84,7 +88,7 @@ public class AMQ4656Test {
|
||||||
brokerService.waitUntilStopped();
|
brokerService.waitUntilStopped();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout = 90000)
|
||||||
public void testDurableConsumerEnqueueCountWithZeroPrefetch() throws Exception {
|
public void testDurableConsumerEnqueueCountWithZeroPrefetch() throws Exception {
|
||||||
|
|
||||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
|
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
|
||||||
|
@ -98,12 +102,10 @@ public class AMQ4656Test {
|
||||||
|
|
||||||
MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub");
|
MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub");
|
||||||
|
|
||||||
BrokerView view = brokerService.getAdminView();
|
final BrokerViewMBean brokerView = brokerService.getAdminView();
|
||||||
view.getDurableTopicSubscribers();
|
ObjectName subName = brokerView.getDurableTopicSubscribers()[0];
|
||||||
|
|
||||||
ObjectName subName = view.getDurableTopicSubscribers()[0];
|
final DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
|
||||||
|
|
||||||
DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
|
|
||||||
brokerService.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
|
brokerService.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
|
||||||
|
|
||||||
assertEquals(0, sub.getEnqueueCounter());
|
assertEquals(0, sub.getEnqueueCounter());
|
||||||
|
@ -122,7 +124,21 @@ public class AMQ4656Test {
|
||||||
|
|
||||||
consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub");
|
consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub");
|
||||||
|
|
||||||
Thread.sleep(1000);
|
assertTrue("Should be an Active Subscription", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return brokerView.getDurableTopicSubscribers().length == 1;
|
||||||
|
}
|
||||||
|
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(25)));
|
||||||
|
|
||||||
|
assertTrue("Should all be dispatched", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return sub.getDispatchedCounter() == 20;
|
||||||
|
}
|
||||||
|
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(25)));
|
||||||
|
|
||||||
assertEquals(20, sub.getEnqueueCounter());
|
assertEquals(20, sub.getEnqueueCounter());
|
||||||
assertEquals(0, sub.getDequeueCounter());
|
assertEquals(0, sub.getDequeueCounter());
|
||||||
|
@ -137,15 +153,32 @@ public class AMQ4656Test {
|
||||||
|
|
||||||
consumer.close();
|
consumer.close();
|
||||||
|
|
||||||
Thread.sleep(2000);
|
|
||||||
|
|
||||||
LOG.info("Pending Queue Size with two receives: {}", sub.getPendingQueueSize());
|
LOG.info("Pending Queue Size with two receives: {}", sub.getPendingQueueSize());
|
||||||
|
|
||||||
assertEquals(20, sub.getEnqueueCounter());
|
assertTrue("Should be an Active Subscription", Wait.waitFor(new Wait.Condition() {
|
||||||
assertEquals(2, sub.getDequeueCounter());
|
|
||||||
assertEquals(18, sub.getPendingQueueSize());
|
@Override
|
||||||
assertEquals(20, sub.getDispatchedCounter());
|
public boolean isSatisified() throws Exception {
|
||||||
assertEquals(0, sub.getDispatchedQueueSize());
|
return brokerView.getInactiveDurableTopicSubscribers().length == 1;
|
||||||
|
}
|
||||||
|
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(25)));
|
||||||
|
|
||||||
|
final DurableSubscriptionViewMBean inactive = (DurableSubscriptionViewMBean)
|
||||||
|
brokerService.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
|
||||||
|
|
||||||
|
assertTrue("Should all be dispatched", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return inactive.getDequeueCounter() == 2;
|
||||||
|
}
|
||||||
|
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(25)));
|
||||||
|
|
||||||
|
assertEquals(20, inactive.getEnqueueCounter());
|
||||||
|
assertEquals(2, inactive.getDequeueCounter());
|
||||||
|
assertEquals(18, inactive.getPendingQueueSize());
|
||||||
|
assertEquals(20, inactive.getDispatchedCounter());
|
||||||
|
assertEquals(0, inactive.getDispatchedQueueSize());
|
||||||
|
|
||||||
session.close();
|
session.close();
|
||||||
connection.close();
|
connection.close();
|
||||||
|
|
Loading…
Reference in New Issue