renamed to follow Test naming convention

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@903487 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-01-26 23:31:06 +00:00
parent 8eeedce809
commit 8b1e16de48
2 changed files with 118 additions and 117 deletions

View File

@ -27,7 +27,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
public class AMQ2571 extends EmbeddedBrokerTestSupport { public class AMQ2571Test extends EmbeddedBrokerTestSupport {
public void testTempQueueClosing() { public void testTempQueueClosing() {
try { try {

View File

@ -50,64 +50,63 @@ import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.activemq.util.ThreadTracker; import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
/** /**
* @version $Revision: 1.5 $ * @version $Revision: 1.5 $ A Test case for AMQ-1479
* A Test case for AMQ-1479
*/ */
public class DurableConsumerTest extends CombinationTestSupport { public class DurableConsumerTest extends CombinationTestSupport{
private static final Log LOG = LogFactory.getLog(DurableConsumerTest.class); private static final Log LOG = LogFactory.getLog(DurableConsumerTest.class);
private static int COUNT = 1024*10; private static int COUNT = 1024 * 10;
private static String CONSUMER_NAME = "DURABLE_TEST"; private static String CONSUMER_NAME = "DURABLE_TEST";
protected BrokerService broker; protected BrokerService broker;
protected String bindAddress="tcp://localhost:61616";
protected byte[] payload = new byte[1024*32]; protected String bindAddress = "tcp://localhost:61616";
protected byte[] payload = new byte[1024 * 32];
protected ConnectionFactory factory; protected ConnectionFactory factory;
protected Vector<Exception> exceptions = new Vector<Exception>(); protected Vector<Exception> exceptions = new Vector<Exception>();
private static final String TOPIC_NAME = "failoverTopic"; private static final String TOPIC_NAME = "failoverTopic";
private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)"; private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
public boolean useDedicatedTaskRunner = false; public boolean useDedicatedTaskRunner = false;
private class SimpleTopicSubscriber implements MessageListener, ExceptionListener { private class SimpleTopicSubscriber implements MessageListener,ExceptionListener{
private TopicConnection topicConnection = null; private TopicConnection topicConnection = null;
public SimpleTopicSubscriber(String connectionURL, String clientId, String topicName) { public SimpleTopicSubscriber(String connectionURL,String clientId,String topicName) {
ActiveMQConnectionFactory topicConnectionFactory = null; ActiveMQConnectionFactory topicConnectionFactory = null;
TopicSession topicSession = null; TopicSession topicSession = null;
Topic topic = null; Topic topic = null;
TopicSubscriber topicSubscriber = null; TopicSubscriber topicSubscriber = null;
topicConnectionFactory = new ActiveMQConnectionFactory(connectionURL); topicConnectionFactory = new ActiveMQConnectionFactory(connectionURL);
try { try {
topic = new ActiveMQTopic(topicName); topic = new ActiveMQTopic(topicName);
topicConnection = topicConnectionFactory.createTopicConnection(); topicConnection = topicConnectionFactory.createTopicConnection();
topicConnection.setClientID((clientId)); topicConnection.setClientID((clientId));
topicConnection.start(); topicConnection.start();
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicSubscriber = topicSession.createDurableSubscriber(topic, (clientId)); topicSubscriber = topicSession.createDurableSubscriber(topic, (clientId));
topicSubscriber.setMessageListener(this); topicSubscriber.setMessageListener(this);
} catch (JMSException e) { } catch (JMSException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
public void onMessage(Message arg0) { public void onMessage(Message arg0){
} }
public void closeConnection() { public void closeConnection(){
if (topicConnection != null) { if (topicConnection != null) {
try { try {
topicConnection.close(); topicConnection.close();
@ -115,39 +114,38 @@ public class DurableConsumerTest extends CombinationTestSupport {
} }
} }
} }
public void onException(JMSException exception) { public void onException(JMSException exception){
exceptions.add(exception); exceptions.add(exception);
} }
} }
private class MessagePublisher implements Runnable { private class MessagePublisher implements Runnable{
private boolean shouldPublish = true; private boolean shouldPublish = true;
public void run() { public void run(){
TopicConnectionFactory topicConnectionFactory = null; TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null; TopicConnection topicConnection = null;
TopicSession topicSession = null; TopicSession topicSession = null;
Topic topic = null; Topic topic = null;
TopicPublisher topicPublisher = null; TopicPublisher topicPublisher = null;
Message message = null; Message message = null;
topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL); topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
try { try {
topic = new ActiveMQTopic(TOPIC_NAME); topic = new ActiveMQTopic(TOPIC_NAME);
topicConnection = topicConnectionFactory.createTopicConnection(); topicConnection = topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicPublisher = topicSession.createPublisher(topic); topicPublisher = topicSession.createPublisher(topic);
message = topicSession.createMessage(); message = topicSession.createMessage();
} catch( Exception ex ) { } catch (Exception ex) {
exceptions.add(ex); exceptions.add(ex);
} }
while (shouldPublish) { while (shouldPublish) {
try { try {
topicPublisher.publish(message, DeliveryMode.PERSISTENT, 1, 2 * 60 * 60 * 1000); topicPublisher.publish(message, DeliveryMode.PERSISTENT, 1, 2 * 60 * 60 * 1000);
} catch (JMSException ex) { } catch (JMSException ex) {
exceptions.add(ex); exceptions.add(ex);
} }
try { try {
Thread.sleep(1); Thread.sleep(1);
@ -157,34 +155,34 @@ public class DurableConsumerTest extends CombinationTestSupport {
} }
} }
private void configurePersistence(BrokerService broker) throws Exception { private void configurePersistence(BrokerService broker) throws Exception{
File dataDirFile = new File("target/"+ getName()); File dataDirFile = new File("target/" + getName());
AMQPersistenceAdapterFactory fact = new AMQPersistenceAdapterFactory(); AMQPersistenceAdapterFactory fact = new AMQPersistenceAdapterFactory();
fact.setDataDirectory(dataDirFile); fact.setDataDirectory(dataDirFile);
fact.setForceRecoverReferenceStore(true); fact.setForceRecoverReferenceStore(true);
broker.setPersistenceAdapter(fact.createPersistenceAdapter()); broker.setPersistenceAdapter(fact.createPersistenceAdapter());
} }
public void testFailover() throws Exception { public void testFailover() throws Exception{
configurePersistence(broker); configurePersistence(broker);
broker.start(); broker.start();
Thread publisherThread = new Thread( new MessagePublisher() ); Thread publisherThread = new Thread(new MessagePublisher());
publisherThread.start(); publisherThread.start();
for( int i = 0; i < 100; i++ ) { for (int i = 0; i < 100; i++) {
final int id = i; final int id = i;
Thread thread = new Thread( new Runnable() { Thread thread = new Thread(new Runnable(){
public void run() { public void run(){
new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis()+"-"+id, TOPIC_NAME); new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME);
} }
} ); });
thread.start(); thread.start();
} }
Thread.sleep(5000); Thread.sleep(5000);
broker.stop(); broker.stop();
broker = createBroker(false); broker = createBroker(false);
@ -196,41 +194,38 @@ public class DurableConsumerTest extends CombinationTestSupport {
// makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028 // makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028
// with use dedicatedTaskRunner=true and produce OOM // with use dedicatedTaskRunner=true and produce OOM
public void initCombosForTestConcurrentDurableConsumer() { public void initCombosForTestConcurrentDurableConsumer(){
addCombinationValues("useDedicatedTaskRunner", new Object[] {Boolean.TRUE, Boolean.FALSE}); addCombinationValues("useDedicatedTaskRunner", new Object[] { Boolean.TRUE, Boolean.FALSE });
} }
public void testConcurrentDurableConsumer() throws Exception { public void testConcurrentDurableConsumer() throws Exception{
broker.start(); broker.start();
factory = createConnectionFactory(); factory = createConnectionFactory();
final String topicName = getName(); final String topicName = getName();
final int numMessages = 500; final int numMessages = 500;
int numConsumers = 20; int numConsumers = 1;
final CountDownLatch counsumerStarted = new CountDownLatch(0); final CountDownLatch counsumerStarted = new CountDownLatch(0);
final AtomicInteger receivedCount = new AtomicInteger(); final AtomicInteger receivedCount = new AtomicInteger();
Runnable consumer = new Runnable() { Runnable consumer = new Runnable(){
public void run() { public void run(){
final String consumerName = Thread.currentThread().getName(); final String consumerName = Thread.currentThread().getName();
int acked = 0; int acked = 0;
int received = 0; int received = 0;
try { try {
while (acked < numMessages/2) { while (acked < numMessages / 2) {
// take one message and close, ack on occasion // take one message and close, ack on occasion
Connection consumerConnection = factory.createConnection(); Connection consumerConnection = factory.createConnection();
((ActiveMQConnection)consumerConnection).setWatchTopicAdvisories(false); ((ActiveMQConnection) consumerConnection).setWatchTopicAdvisories(false);
consumerConnection.setClientID(consumerName); consumerConnection.setClientID(consumerName);
Session consumerSession = consumerConnection.createSession(false, Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session.CLIENT_ACKNOWLEDGE);
Topic topic = consumerSession.createTopic(topicName); Topic topic = consumerSession.createTopic(topicName);
consumerConnection.start(); consumerConnection.start();
MessageConsumer consumer = consumerSession MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, consumerName);
.createDurableSubscriber(topic, consumerName);
counsumerStarted.countDown(); counsumerStarted.countDown();
Message msg = null; Message msg = null;
do { do {
@ -243,7 +238,7 @@ public class DurableConsumerTest extends CombinationTestSupport {
} }
} }
} while (msg == null); } while (msg == null);
consumerConnection.close(); consumerConnection.close();
} }
assertTrue(received >= acked); assertTrue(received >= acked);
@ -255,24 +250,24 @@ public class DurableConsumerTest extends CombinationTestSupport {
}; };
ExecutorService executor = Executors.newFixedThreadPool(numConsumers); ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
for (int i=0; i<numConsumers ; i++) { for (int i = 0; i < numConsumers; i++) {
executor.execute(consumer); executor.execute(consumer);
} }
assertTrue(counsumerStarted.await(30, TimeUnit.SECONDS)); assertTrue(counsumerStarted.await(30, TimeUnit.SECONDS));
Connection producerConnection = factory.createConnection(); Connection producerConnection = factory.createConnection();
((ActiveMQConnection)producerConnection).setWatchTopicAdvisories(false); ((ActiveMQConnection) producerConnection).setWatchTopicAdvisories(false);
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = producerSession.createTopic(topicName); Topic topic = producerSession.createTopic(topicName);
MessageProducer producer = producerSession.createProducer(topic); MessageProducer producer = producerSession.createProducer(topic);
producerConnection.start(); producerConnection.start();
for (int i =0; i < numMessages; i++) { for (int i = 0; i < numMessages; i++) {
BytesMessage msg = producerSession.createBytesMessage(); BytesMessage msg = producerSession.createBytesMessage();
msg.writeBytes(payload); msg.writeBytes(payload);
producer.send(msg); producer.send(msg);
if (i != 0 && i%100==0) { if (i != 0 && i % 100 == 0) {
LOG.info("Sent msg " + i); LOG.info("Sent msg " + i);
} }
} }
@ -280,35 +275,35 @@ public class DurableConsumerTest extends CombinationTestSupport {
executor.shutdown(); executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS); executor.awaitTermination(30, TimeUnit.SECONDS);
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition(){
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception{
return receivedCount.get() > numMessages; return receivedCount.get() > numMessages;
} }
}, 60*1000); }, 60 * 1000);
assertTrue("got some messages: " + receivedCount.get(), receivedCount.get() > numMessages); assertTrue("got some messages: " + receivedCount.get(), receivedCount.get() > numMessages);
assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty()); assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty());
} }
public void testConsumerRecover() throws Exception { public void testConsumerRecover() throws Exception{
doTestConsumer(true); doTestConsumer(true);
} }
public void testConsumer() throws Exception { public void testConsumer() throws Exception{
doTestConsumer(false); doTestConsumer(false);
} }
public void doTestConsumer(boolean forceRecover) throws Exception{ public void doTestConsumer(boolean forceRecover) throws Exception{
if (forceRecover) { if (forceRecover) {
configurePersistence(broker); configurePersistence(broker);
} }
broker.start(); broker.start();
factory = createConnectionFactory(); factory = createConnectionFactory();
Connection consumerConnection = factory.createConnection(); Connection consumerConnection = factory.createConnection();
consumerConnection.setClientID(CONSUMER_NAME); consumerConnection.setClientID(CONSUMER_NAME);
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = consumerSession.createTopic(getClass().getName()); Topic topic = consumerSession.createTopic(getClass().getName());
MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME); MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
consumerConnection.start(); consumerConnection.start();
consumerConnection.close(); consumerConnection.close();
@ -320,16 +315,16 @@ public class DurableConsumerTest extends CombinationTestSupport {
broker.start(); broker.start();
Connection producerConnection = factory.createConnection(); Connection producerConnection = factory.createConnection();
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(topic); MessageProducer producer = producerSession.createProducer(topic);
producerConnection.start(); producerConnection.start();
for (int i =0; i < COUNT;i++) { for (int i = 0; i < COUNT; i++) {
BytesMessage msg = producerSession.createBytesMessage(); BytesMessage msg = producerSession.createBytesMessage();
msg.writeBytes(payload); msg.writeBytes(payload);
producer.send(msg); producer.send(msg);
if (i != 0 && i%1000==0) { if (i != 0 && i % 1000 == 0) {
LOG.info("Sent msg " + i); LOG.info("Sent msg " + i);
} }
} }
@ -345,73 +340,79 @@ public class DurableConsumerTest extends CombinationTestSupport {
consumerConnection.setClientID(CONSUMER_NAME); consumerConnection.setClientID(CONSUMER_NAME);
consumerConnection.start(); consumerConnection.start();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME); consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
for (int i =0; i < COUNT;i++) { for (int i = 0; i < COUNT; i++) {
Message msg = consumer.receive(5000); Message msg = consumer.receive(5000);
assertNotNull("Missing message: "+i, msg); assertNotNull("Missing message: " + i, msg);
if (i != 0 && i%1000==0) { if (i != 0 && i % 1000 == 0) {
LOG.info("Received msg " + i); LOG.info("Received msg " + i);
} }
} }
consumerConnection.close(); consumerConnection.close();
} }
protected void setUp() throws Exception { protected void setUp() throws Exception{
if (broker == null) { if (broker == null) {
broker = createBroker(true); broker = createBroker(true);
} }
super.setUp(); super.setUp();
} }
protected void tearDown() throws Exception { protected void tearDown() throws Exception{
super.tearDown(); super.tearDown();
if (broker != null) { if (broker != null) {
broker.stop(); broker.stop();
broker = null; broker = null;
} }
} }
protected Topic creatTopic(Session s, String destinationName) throws JMSException { protected Topic creatTopic(Session s,String destinationName) throws JMSException{
return s.createTopic(destinationName); return s.createTopic(destinationName);
} }
/** /**
* Factory method to create a new broker * Factory method to create a new broker
* *
* @throws Exception * @throws Exception
*/ */
protected BrokerService createBroker(boolean deleteStore) throws Exception { protected BrokerService createBroker(boolean deleteStore) throws Exception{
BrokerService answer = new BrokerService(); BrokerService answer = new BrokerService();
configureBroker(answer,deleteStore); configureBroker(answer, deleteStore);
return answer; return answer;
} }
protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception{
protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception {
answer.setDeleteAllMessagesOnStartup(deleteStore); answer.setDeleteAllMessagesOnStartup(deleteStore);
KahaDBStore kaha = new KahaDBStore();
File directory = new File("target/activemq-data/kahadb");
if (deleteStore) {
IOHelper.deleteChildren(directory);
}
kaha.setDirectory(directory);
answer.setPersistenceAdapter(kaha);
answer.addConnector(bindAddress); answer.addConnector(bindAddress);
answer.setUseShutdownHook(false); answer.setUseShutdownHook(false);
answer.setUseJmx(false); answer.setUseJmx(false);
answer.setAdvisorySupport(false); answer.setAdvisorySupport(false);
answer.setDedicatedTaskRunner(useDedicatedTaskRunner); answer.setDedicatedTaskRunner(useDedicatedTaskRunner);
} }
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress);
factory.setUseDedicatedTaskRunner(useDedicatedTaskRunner); factory.setUseDedicatedTaskRunner(useDedicatedTaskRunner);
return factory; return factory;
} }
public static Test suite() { public static Test suite(){
return suite(DurableConsumerTest.class); return suite(DurableConsumerTest.class);
} }
public static void main(String[] args) { public static void main(String[] args){
junit.textui.TestRunner.run(suite()); junit.textui.TestRunner.run(suite());
} }
} }