mirror of https://github.com/apache/activemq.git
Update the test case so that its not dependent on port 61616
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1086378 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
442232514c
commit
b0a38ff6c1
|
@ -50,7 +50,7 @@ import static org.apache.activemq.TestSupport.getDestinationStatistics;
|
|||
public class ExpiredMessagesTest extends CombinationTestSupport {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesTest.class);
|
||||
|
||||
|
||||
BrokerService broker;
|
||||
Connection connection;
|
||||
Session session;
|
||||
|
@ -60,7 +60,8 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
|||
public ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
|
||||
public boolean useTextMessage = true;
|
||||
public boolean useVMCursor = true;
|
||||
|
||||
protected String brokerUri;
|
||||
|
||||
public static Test suite() {
|
||||
return suite(ExpiredMessagesTest.class);
|
||||
}
|
||||
|
@ -68,78 +69,79 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
|||
public static void main(String[] args) {
|
||||
junit.textui.TestRunner.run(suite());
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
final boolean deleteAllMessages = true;
|
||||
broker = createBroker(deleteAllMessages, 100);
|
||||
brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
|
||||
}
|
||||
|
||||
public void testExpiredMessages() throws Exception {
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
connection = factory.createConnection();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
producer = session.createProducer(destination);
|
||||
producer.setTimeToLive(100);
|
||||
consumer = session.createConsumer(destination);
|
||||
connection.start();
|
||||
final AtomicLong received = new AtomicLong();
|
||||
|
||||
Thread consumerThread = new Thread("Consumer Thread") {
|
||||
public void run() {
|
||||
long start = System.currentTimeMillis();
|
||||
try {
|
||||
long end = System.currentTimeMillis();
|
||||
while (end - start < 3000) {
|
||||
if (consumer.receive(1000) != null) {
|
||||
received.incrementAndGet();
|
||||
}
|
||||
Thread.sleep(100);
|
||||
end = System.currentTimeMillis();
|
||||
}
|
||||
consumer.close();
|
||||
} catch (Throwable ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
consumerThread.start();
|
||||
|
||||
final int numMessagesToSend = 10000;
|
||||
Thread producingThread = new Thread("Producing Thread") {
|
||||
|
||||
public void testExpiredMessages() throws Exception {
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
|
||||
connection = factory.createConnection();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
producer = session.createProducer(destination);
|
||||
producer.setTimeToLive(100);
|
||||
consumer = session.createConsumer(destination);
|
||||
connection.start();
|
||||
final AtomicLong received = new AtomicLong();
|
||||
|
||||
Thread consumerThread = new Thread("Consumer Thread") {
|
||||
public void run() {
|
||||
long start = System.currentTimeMillis();
|
||||
try {
|
||||
int i = 0;
|
||||
while (i++ < numMessagesToSend) {
|
||||
producer.send(session.createTextMessage("test"));
|
||||
}
|
||||
producer.close();
|
||||
long end = System.currentTimeMillis();
|
||||
while (end - start < 3000) {
|
||||
if (consumer.receive(1000) != null) {
|
||||
received.incrementAndGet();
|
||||
}
|
||||
Thread.sleep(100);
|
||||
end = System.currentTimeMillis();
|
||||
}
|
||||
consumer.close();
|
||||
} catch (Throwable ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
producingThread.start();
|
||||
|
||||
};
|
||||
|
||||
consumerThread.start();
|
||||
|
||||
final int numMessagesToSend = 10000;
|
||||
Thread producingThread = new Thread("Producing Thread") {
|
||||
public void run() {
|
||||
try {
|
||||
int i = 0;
|
||||
while (i++ < numMessagesToSend) {
|
||||
producer.send(session.createTextMessage("test"));
|
||||
}
|
||||
producer.close();
|
||||
} catch (Throwable ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
producingThread.start();
|
||||
|
||||
consumerThread.join();
|
||||
producingThread.join();
|
||||
session.close();
|
||||
|
||||
|
||||
final DestinationStatistics view = getDestinationStatistics(broker, destination);
|
||||
|
||||
// wait for all to inflight to expire
|
||||
assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
return view.getInflight().getCount() == 0;
|
||||
}
|
||||
}
|
||||
}));
|
||||
assertEquals("Wrong inFlightCount: ", 0, view.getInflight().getCount());
|
||||
|
||||
|
||||
LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
|
||||
+ ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
|
||||
|
||||
|
||||
// wait for all sent to get delivered and expire
|
||||
assertTrue("all sent messages expired ", Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
|
@ -148,15 +150,15 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
|||
LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getDequeues().getCount() + ", dequeues: " + view.getDequeues().getCount()
|
||||
+ ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
|
||||
return oldEnqueues == view.getEnqueues().getCount();
|
||||
}
|
||||
}
|
||||
}, 60*1000));
|
||||
|
||||
|
||||
|
||||
LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
|
||||
+ ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
|
||||
|
||||
|
||||
assertTrue("got at least what did not expire", received.get() >= view.getDequeues().getCount() - view.getExpired().getCount());
|
||||
|
||||
|
||||
assertTrue("all messages expired - queue size gone to zero " + view.getMessages().getCount(), Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
|
||||
|
@ -164,61 +166,61 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
|||
return view.getMessages().getCount() == 0;
|
||||
}
|
||||
}));
|
||||
|
||||
|
||||
final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount();
|
||||
final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue;
|
||||
|
||||
|
||||
final DestinationStatistics dlqView = getDestinationStatistics(broker, dlqDestination);
|
||||
LOG.info("DLQ stats: size= " + dlqView.getMessages().getCount() + ", enqueues: " + dlqView.getDequeues().getCount() + ", dequeues: " + dlqView.getDequeues().getCount()
|
||||
+ ", dispatched: " + dlqView.getDispatched().getCount() + ", inflight: " + dlqView.getInflight().getCount() + ", expiries: " + dlqView.getExpired().getCount());
|
||||
|
||||
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
return totalExpiredCount == dlqView.getMessages().getCount();
|
||||
}
|
||||
});
|
||||
assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getMessages().getCount());
|
||||
|
||||
|
||||
// memory check
|
||||
assertEquals("memory usage is back to duck egg", 0, getDestination(broker, destination).getMemoryUsage().getPercentUsage());
|
||||
assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage());
|
||||
|
||||
assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage());
|
||||
|
||||
// verify DLQ
|
||||
MessageConsumer dlqConsumer = createDlqConsumer(connection);
|
||||
final DLQListener dlqListener = new DLQListener();
|
||||
dlqConsumer.setMessageListener(dlqListener);
|
||||
|
||||
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
return totalExpiredCount == dlqListener.count;
|
||||
}
|
||||
}, 60 * 1000);
|
||||
|
||||
|
||||
assertEquals("dlq returned all expired", dlqListener.count, totalExpiredCount);
|
||||
}
|
||||
}
|
||||
|
||||
class DLQListener implements MessageListener {
|
||||
|
||||
|
||||
int count = 0;
|
||||
|
||||
|
||||
public void onMessage(Message message) {
|
||||
count++;
|
||||
}
|
||||
|
||||
|
||||
};
|
||||
|
||||
private MessageConsumer createDlqConsumer(Connection connection) throws Exception {
|
||||
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(dlqDestination);
|
||||
|
||||
private MessageConsumer createDlqConsumer(Connection connection) throws Exception {
|
||||
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(dlqDestination);
|
||||
}
|
||||
|
||||
public void initCombosForTestRecoverExpiredMessages() {
|
||||
addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE});
|
||||
}
|
||||
|
||||
public void testRecoverExpiredMessages() throws Exception {
|
||||
addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE});
|
||||
}
|
||||
|
||||
public void testRecoverExpiredMessages() throws Exception {
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
|
||||
"failover://tcp://localhost:61616");
|
||||
"failover://"+brokerUri);
|
||||
connection = factory.createConnection();
|
||||
connection.start();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -247,7 +249,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
|||
producingThread.join();
|
||||
|
||||
DestinationStatistics view = getDestinationStatistics(broker, destination);
|
||||
LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
|
||||
LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
|
||||
+ view.getEnqueues().getCount() + ", dequeues: "
|
||||
+ view.getDequeues().getCount() + ", dispatched: "
|
||||
+ view.getDispatched().getCount() + ", inflight: "
|
||||
|
@ -263,7 +265,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
|||
LOG.info("recovering broker");
|
||||
final boolean deleteAllMessages = false;
|
||||
broker = createBroker(deleteAllMessages, 5000);
|
||||
|
||||
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
DestinationStatistics view = getDestinationStatistics(broker, destination);
|
||||
|
@ -273,25 +275,25 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
|||
+ view.getDispatched().getCount() + ", inflight: "
|
||||
+ view.getInflight().getCount() + ", expiries: "
|
||||
+ view.getExpired().getCount());
|
||||
|
||||
|
||||
return view.getMessages().getCount() == 0;
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
view = getDestinationStatistics(broker, destination);
|
||||
assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount());
|
||||
assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount());
|
||||
}
|
||||
|
||||
private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception {
|
||||
BrokerService broker = new BrokerService();
|
||||
private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception {
|
||||
BrokerService broker = new BrokerService();
|
||||
broker.setBrokerName("localhost");
|
||||
broker.setDestinations(new ActiveMQDestination[]{destination});
|
||||
AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
|
||||
adaptor.setDirectory(new File("target/expiredtest-data/"));
|
||||
adaptor.setForceRecoverReferenceStore(true);
|
||||
broker.setPersistenceAdapter(adaptor);
|
||||
|
||||
|
||||
PolicyEntry defaultPolicy = new PolicyEntry();
|
||||
if (useVMCursor) {
|
||||
defaultPolicy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
|
||||
|
@ -302,17 +304,17 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
|||
policyMap.setDefaultEntry(defaultPolicy);
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
||||
broker.addConnector("tcp://localhost:61616");
|
||||
broker.addConnector("tcp://localhost:0");
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
return broker;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
connection.stop();
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
}
|
||||
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
connection.stop();
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue