Allows the class to set a field that isn't public but has a setter method. Fixes the compile warnings as well.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1154189 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-08-05 12:33:10 +00:00
parent c0513811a9
commit d8b5e2d6e2
1 changed files with 62 additions and 48 deletions

View File

@ -41,19 +41,20 @@ import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesWithNoConsumerTest.class);
private final ActiveMQDestination destination = new ActiveMQQueue("test");
BrokerService broker;
Connection connection;
Session session;
MessageProducer producer;
public ActiveMQDestination destination = new ActiveMQQueue("test");
public boolean optimizedDispatch = true;
public PendingQueueMessageStoragePolicy pendingQueuePolicy;
private boolean optimizedDispatch = true;
private PendingQueueMessageStoragePolicy pendingQueuePolicy;
private BrokerService broker;
private String connectionUri;
private Connection connection;
private Session session;
private MessageProducer producer;
public static Test suite() {
return suite(ExpiredMessagesWithNoConsumerTest.class);
@ -76,11 +77,11 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
broker.setBrokerName("localhost");
broker.setUseJmx(true);
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("tcp://localhost:61616");
broker.addConnector("tcp://localhost:0");
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setOptimizedDispatch(optimizedDispatch );
defaultEntry.setOptimizedDispatch(optimizedDispatch);
defaultEntry.setExpireMessagesPeriod(800);
defaultEntry.setMaxExpirePageSize(800);
@ -93,11 +94,12 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
}
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
broker.start();
broker.waitUntilStarted();
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
public void initCombosForTestExpiredMessagesWithNoConsumer() {
@ -109,7 +111,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
createBrokerWithMemoryLimit();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
@ -139,7 +141,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
producingThread.join(1000);
producingThread.join(TimeUnit.SECONDS.toMillis(1000));
return !producingThread.isAlive();
}
}));
@ -157,15 +159,16 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ ", size= " + view.getQueueSize());
assertEquals("All sent have expired", sendCount, view.getExpiredCount());
assertEquals("memory usage goes to duck egg", 0, view.getMemoryPercentUsage());
assertEquals("Not all sent messages have expired", sendCount, view.getExpiredCount());
assertEquals("memory usage doesn't go to duck egg", 0, view.getMemoryPercentUsage());
}
// first ack delivered after expiry
public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
createBroker();
final long queuePrefetch = 600;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
connection = factory.createConnection();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
producer = session.createProducer(destination);
@ -183,7 +186,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
try {
LOG.info("Got my message: " + message);
receivedOneCondition.countDown();
waitCondition.await(60, TimeUnit.SECONDS);
waitCondition.await(6, TimeUnit.MINUTES);
LOG.info("acking message: " + message);
message.acknowledge();
} catch (Exception e) {
@ -195,7 +198,6 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
connection.start();
final Thread producingThread = new Thread("Producing Thread") {
public void run() {
try {
@ -222,7 +224,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
producingThread.join(1000);
return !producingThread.isAlive();
}
}, Wait.MAX_WAIT_MILLIS * 2));
}, Wait.MAX_WAIT_MILLIS * 10));
final DestinationViewMBean view = createView(destination);
@ -231,7 +233,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
return queuePrefetch == view.getDispatchCount();
}
}));
assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() {
assertTrue("Not all sent have expired ", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return sendCount == view.getExpiredCount();
}
@ -255,10 +257,10 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ ", size= " + view.getQueueSize());
assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount());
assertEquals("size gets back to 0 ", 0, view.getQueueSize());
assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount());
assertEquals("inflight didn't reduce to half prefetch minus single delivered message",
(queuePrefetch/2) -1, view.getInFlightCount());
assertEquals("size didn't get back to 0 ", 0, view.getQueueSize());
assertEquals("dequeues didn't match sent/expired ", sendCount, view.getDequeueCount());
consumer.close();
@ -275,7 +277,8 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
public void testExpiredMessagesWithVerySlowConsumerCanContinue() throws Exception {
createBroker();
final long queuePrefetch = 600;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
connection = factory.createConnection();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
producer = session.createProducer(destination);
@ -291,11 +294,15 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
public void onMessage(Message message) {
try {
LOG.info("Got my message: " + message);
if(LOG.isDebugEnabled()) {
LOG.debug("Got my message: " + message);
}
receivedOneCondition.countDown();
received.incrementAndGet();
waitCondition.await(60, TimeUnit.SECONDS);
LOG.info("acking message: " + message);
waitCondition.await(5, TimeUnit.MINUTES);
if(LOG.isDebugEnabled()) {
LOG.debug("acking message: " + message);
}
message.acknowledge();
} catch (Exception e) {
e.printStackTrace();
@ -306,7 +313,6 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
connection.start();
final Thread producingThread = new Thread("Producing Thread") {
public void run() {
try {
@ -333,16 +339,16 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
producingThread.join(1000);
return !producingThread.isAlive();
}
}));
}, Wait.MAX_WAIT_MILLIS * 10));
final DestinationViewMBean view = createView(destination);
assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
assertTrue("Not all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return queuePrefetch == view.getDispatchCount();
}
}));
assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() {
assertTrue("All have not sent have expired ", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return sendCount == view.getExpiredCount();
}
@ -366,16 +372,20 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ ", size= " + view.getQueueSize());
assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount());
assertEquals("size gets back to 0 ", 0, view.getQueueSize());
assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount());
assertEquals("inflight didn't reduce to half prefetch minus single delivered message",
(queuePrefetch/2) -1, view.getInFlightCount());
assertEquals("size doesn't get back to 0 ", 0, view.getQueueSize());
assertEquals("dequeues don't match sent/expired ", sendCount, view.getDequeueCount());
// produce some more
producer.setTimeToLive(0);
long tStamp = System.currentTimeMillis();
for (int i=0; i<sendCount; i++) {
producer.send(session.createTextMessage("test-" + i));
if (i%100 == 0) {
LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms");
tStamp = System.currentTimeMillis() ;
}
}
Wait.waitFor(new Wait.Condition() {
@ -391,15 +401,14 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
return 0 == view.getInFlightCount();
}
});
assertEquals("inflight goes to zeor on close", 0, view.getInFlightCount());
assertEquals("inflight did not go to zeor on close", 0, view.getInFlightCount());
LOG.info("done: " + getName());
}
public void testExpireMessagesForDurableSubscriber() throws Exception {
createBroker();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
connection = factory.createConnection();
connection.setClientID("myConnection");
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@ -420,20 +429,17 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
DestinationViewMBean view = createView((ActiveMQTopic)destination);
LOG.info("messages sent");
LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
assertEquals(0, view.getExpiredCount());
assertEquals(10, view.getEnqueueCount());
Thread.sleep(4000);
LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
assertEquals(10, view.getExpiredCount());
assertEquals(0, view.getEnqueueCount());
final AtomicLong received = new AtomicLong();
sub = session.createDurableSubscriber(destination, "mySub");
sub.setMessageListener(new MessageListener() {
@ -445,7 +451,6 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
LOG.info("Waiting for messages to arrive");
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return received.get() >= sendCount;
@ -458,11 +463,8 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
assertEquals(0, received.get());
assertEquals(10, view.getExpiredCount());
assertEquals(0, view.getEnqueueCount());
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
String domain = "org.apache.activemq";
ObjectName name;
@ -481,7 +483,19 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
broker.waitUntilStopped();
}
public boolean getOptimizedDispatch() {
return this.optimizedDispatch;
}
public void setOptimizedDispatch(boolean option) {
this.optimizedDispatch = option;
}
public PendingQueueMessageStoragePolicy getPendingQueuePolicy() {
return this.pendingQueuePolicy;
}
public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy policy) {
this.pendingQueuePolicy = policy;
}
}