fix unit tests

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1517076 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-08-23 22:18:16 +00:00
parent 174fe15b30
commit b3c132780b
5 changed files with 100 additions and 81 deletions

View File

@ -53,6 +53,7 @@ public class JmsCronSchedulerTest extends EmbeddedBrokerTestSupport {
final CountDownLatch latch = new CountDownLatch(COUNT);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
count.incrementAndGet();
@ -118,7 +119,7 @@ public class JmsCronSchedulerTest extends EmbeddedBrokerTestSupport {
IOHelper.deleteChildren(schedulerDirectory);
}
BrokerService answer = new BrokerService();
answer.setPersistent(isPersistent());
answer.setPersistent(true);
answer.getManagementContext().setCreateConnector(false);
answer.setDeleteAllMessagesOnStartup(true);
answer.setDataDirectory("target");

View File

@ -21,6 +21,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@ -51,6 +52,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
final CountDownLatch latch = new CountDownLatch(COUNT);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
count.incrementAndGet();
@ -88,6 +90,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
final CountDownLatch latch = new CountDownLatch(COUNT);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
}
@ -119,6 +122,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
final CountDownLatch latch = new CountDownLatch(COUNT);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
session.commit();
@ -158,6 +162,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
final CountDownLatch latch = new CountDownLatch(NUMBER);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
count.incrementAndGet();
@ -216,13 +221,13 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
producer.send(message);
producer.close();
}
public void testJobSchedulerStoreUsage() throws Exception {
// Shrink the store limit down so we get the producer to block
broker.getSystemUsage().getJobSchedulerUsage().setLimit(10 * 1024);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection conn = factory.createConnection();
conn.start();
@ -234,24 +239,25 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
Message message = super.createMessage(i);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
return message;
}
}
};
producer.setMessageCount(100);
producer.start();
MessageConsumer consumer = sess.createConsumer(destination);
final CountDownLatch latch = new CountDownLatch(100);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
}
});
// wait for the producer to block, which should happen immediately, and also wait long
// wait for the producer to block, which should happen immediately, and also wait long
// enough for the delay to elapse. We should see no deliveries as the send should block
// on the first message.
Thread.sleep(10000l);
assertEquals(100, latch.getCount());
// Increase the store limit so the producer unblocks. Everything should enqueue at this point.
@ -262,16 +268,17 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
// Make sure we sent all the messages we expected to send
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return producer.getSentCount() == producer.getMessageCount();
}
}, 20000l);
assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
// Make sure we got all the messages we expected to get
latch.await(20000l, TimeUnit.MILLISECONDS);
assertEquals("Consumer did not receive all messages.", 0, latch.getCount());
}
@ -293,7 +300,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
IOHelper.deleteChildren(schedulerDirectory);
}
BrokerService answer = new BrokerService();
answer.setPersistent(isPersistent());
answer.setPersistent(true);
answer.setDeleteAllMessagesOnStartup(true);
answer.setDataDirectory("target");
answer.setSchedulerDirectoryFile(schedulerDirectory);

View File

@ -58,6 +58,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
final CountDownLatch latch = new CountDownLatch(COUNT);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
}
@ -96,6 +97,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
final CountDownLatch latch = new CountDownLatch(COUNT);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
}
@ -105,9 +107,10 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
MessageConsumer browser = session.createConsumer(browseDest);
final CountDownLatch browsedLatch = new CountDownLatch(COUNT);
browser.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
browsedLatch.countDown();
LOG.debug("Scheduled Message Browser got Message: " + message);
browsedLatch.countDown();
LOG.debug("Scheduled Message Browser got Message: " + message);
}
});
@ -120,7 +123,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
MessageProducer producer = session.createProducer(management);
Message request = session.createMessage();
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
producer.send(request);
@ -158,6 +161,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
final CountDownLatch latch = new CountDownLatch(COUNT);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
}
@ -167,9 +171,10 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
MessageConsumer browser = session.createConsumer(browseDest);
final CountDownLatch browsedLatch = new CountDownLatch(COUNT);
browser.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
browsedLatch.countDown();
LOG.debug("Scheduled Message Browser got Message: " + message);
browsedLatch.countDown();
LOG.debug("Scheduled Message Browser got Message: " + message);
}
});
@ -215,6 +220,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
final CountDownLatch latch = new CountDownLatch(COUNT + 2);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
}
@ -224,9 +230,10 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
MessageConsumer browser = session.createConsumer(browseDest);
final CountDownLatch browsedLatch = new CountDownLatch(COUNT);
browser.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
browsedLatch.countDown();
LOG.debug("Scheduled Message Browser got Message: " + message);
browsedLatch.countDown();
LOG.debug("Scheduled Message Browser got Message: " + message);
}
});
@ -276,6 +283,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
final CountDownLatch latch = new CountDownLatch(COUNT);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
}
@ -290,24 +298,24 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
// Send the browse request
Message request = session.createMessage();
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
request.setJMSReplyTo(browseDest);
producer.send(request);
// Browse all the Scheduled Messages.
for (int i = 0; i < COUNT; ++i) {
Message message = browser.receive(2000);
assertNotNull(message);
Message message = browser.receive(2000);
assertNotNull(message);
try{
Message remove = session.createMessage();
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID,
message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID));
producer.send(remove);
} catch(Exception e) {
}
try{
Message remove = session.createMessage();
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID,
message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID));
producer.send(remove);
} catch(Exception e) {
}
}
// now check that they all got removed and are not delivered.
@ -325,17 +333,17 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
MessageProducer producer = session.createProducer(management);
try{
try{
// Send the remove request
Message remove = session.createMessage();
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, new IdGenerator().generateId());
producer.send(remove);
} catch(Exception e) {
fail("Caught unexpected exception during remove of unscheduled message.");
}
// Send the remove request
Message remove = session.createMessage();
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, new IdGenerator().generateId());
producer.send(remove);
} catch(Exception e) {
fail("Caught unexpected exception during remove of unscheduled message.");
}
}
public void testBrowseWithSelector() throws Exception {
@ -377,7 +385,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
protected void scheduleMessage(Connection connection, long delay) throws Exception {
scheduleMessage(connection, delay, 1);
scheduleMessage(connection, delay, 1);
}
protected void scheduleMessage(Connection connection, long delay, int count) throws Exception {
@ -387,7 +395,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
for(int i = 0; i < count; ++i ) {
producer.send(message);
producer.send(message);
}
producer.close();
@ -411,7 +419,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
IOHelper.deleteChildren(schedulerDirectory);
}
BrokerService answer = new BrokerService();
answer.setPersistent(isPersistent());
answer.setPersistent(true);
answer.setDeleteAllMessagesOnStartup(true);
answer.setDataDirectory("target");
answer.setSchedulerDirectoryFile(schedulerDirectory);

View File

@ -16,48 +16,50 @@
*/
package org.apache.activemq.broker.scheduler;
import junit.framework.TestCase;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ByteSequence;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
public class JobSchedulerStoreTest extends TestCase {
public void testRestart() throws Exception {
JobSchedulerStore store = new JobSchedulerStoreImpl();
File directory = new File("target/test/ScheduledDB");
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store.setDirectory(directory);
final int NUMBER = 1000;
store.start();
List<ByteSequence>list = new ArrayList<ByteSequence>();
for (int i = 0; i < NUMBER;i++ ) {
public void testRestart() throws Exception {
JobSchedulerStore store = new JobSchedulerStoreImpl();
File directory = new File("target/test/ScheduledDB");
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store.setDirectory(directory);
final int NUMBER = 1000;
store.start();
List<ByteSequence>list = new ArrayList<ByteSequence>();
for (int i = 0; i < NUMBER;i++ ) {
ByteSequence buff = new ByteSequence(new String("testjob"+i).getBytes());
list.add(buff);
list.add(buff);
}
JobScheduler js = store.getJobScheduler("test");
int count = 0;
long startTime = 10 * 60 * 1000; long period = startTime;
for (ByteSequence job:list) {
js.schedule("id:"+(count++), job, "", startTime, period, -1);
}
List<Job>test = js.getAllJobs();
assertEquals(list.size(),test.size());
store.stop();
store.start();
js = store.getJobScheduler("test");
test = js.getAllJobs();
assertEquals(list.size(),test.size());
for (int i = 0; i < list.size();i++) {
String orig = new String(list.get(i).getData());
String payload = new String(test.get(i).getPayload());
assertEquals(orig,payload);
}
}
JobScheduler js = store.getJobScheduler("test");
js.startDispatching();
int count = 0;
long startTime = 10 * 60 * 1000; long period = startTime;
for (ByteSequence job:list) {
js.schedule("id:"+(count++), job, "", startTime, period, -1);
}
List<Job>test = js.getAllJobs();
assertEquals(list.size(),test.size());
store.stop();
store.start();
js = store.getJobScheduler("test");
test = js.getAllJobs();
assertEquals(list.size(),test.size());
for (int i = 0; i < list.size();i++) {
String orig = new String(list.get(i).getData());
String payload = new String(test.get(i).getPayload());
assertEquals(orig,payload);
}
}
}

View File

@ -247,6 +247,7 @@ public class JobSchedulerTest {
store.setDirectory(directory);
store.start();
scheduler = store.getJobScheduler("test");
scheduler.startDispatching();
}
@After