mirror of https://github.com/apache/activemq.git
temporary fix for durable consumer test
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@789626 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7cf47287b8
commit
d20da4460d
|
@ -158,8 +158,19 @@ public class DurableConsumerTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void configurePersistence(BrokerService broker) throws Exception {
|
||||||
|
File dataDirFile = new File("target/"+ getName());
|
||||||
|
AMQPersistenceAdapterFactory fact = new AMQPersistenceAdapterFactory();
|
||||||
|
fact.setDataDirectory(dataDirFile);
|
||||||
|
fact.setForceRecoverReferenceStore(true);
|
||||||
|
broker.setPersistenceAdapter(fact.createPersistenceAdapter());
|
||||||
|
}
|
||||||
|
|
||||||
public void testFailover() throws Exception {
|
public void testFailover() throws Exception {
|
||||||
|
|
||||||
|
configurePersistence(broker);
|
||||||
|
broker.start();
|
||||||
|
|
||||||
Thread publisherThread = new Thread( new MessagePublisher() );
|
Thread publisherThread = new Thread( new MessagePublisher() );
|
||||||
publisherThread.start();
|
publisherThread.start();
|
||||||
|
|
||||||
|
@ -179,11 +190,16 @@ public class DurableConsumerTest extends TestCase {
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
broker.stop();
|
broker.stop();
|
||||||
broker = createBroker(false);
|
broker = createBroker(false);
|
||||||
|
configurePersistence(broker);
|
||||||
|
broker.start();
|
||||||
Thread.sleep(10000);
|
Thread.sleep(10000);
|
||||||
assertEquals(0, exceptions.size());
|
assertEquals(0, exceptions.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testConcurrentDurableConsumer() throws Exception {
|
public void testConcurrentDurableConsumer() throws Exception {
|
||||||
|
|
||||||
|
broker.start();
|
||||||
|
|
||||||
factory = createConnectionFactory();
|
factory = createConnectionFactory();
|
||||||
final String topicName = getName();
|
final String topicName = getName();
|
||||||
final int numMessages = 500;
|
final int numMessages = 500;
|
||||||
|
@ -265,6 +281,9 @@ public class DurableConsumerTest extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testConsumer() throws Exception{
|
public void testConsumer() throws Exception{
|
||||||
|
|
||||||
|
broker.start();
|
||||||
|
|
||||||
factory = createConnectionFactory();
|
factory = createConnectionFactory();
|
||||||
Connection consumerConnection = factory.createConnection();
|
Connection consumerConnection = factory.createConnection();
|
||||||
consumerConnection.setClientID(CONSUMER_NAME);
|
consumerConnection.setClientID(CONSUMER_NAME);
|
||||||
|
@ -274,7 +293,8 @@ public class DurableConsumerTest extends TestCase {
|
||||||
consumerConnection.start();
|
consumerConnection.start();
|
||||||
consumerConnection.close();
|
consumerConnection.close();
|
||||||
broker.stop();
|
broker.stop();
|
||||||
broker =createBroker(false);
|
broker = createBroker(false);
|
||||||
|
broker.start();
|
||||||
|
|
||||||
Connection producerConnection = factory.createConnection();
|
Connection producerConnection = factory.createConnection();
|
||||||
|
|
||||||
|
@ -292,14 +312,15 @@ public class DurableConsumerTest extends TestCase {
|
||||||
}
|
}
|
||||||
producerConnection.close();
|
producerConnection.close();
|
||||||
broker.stop();
|
broker.stop();
|
||||||
broker =createBroker(false);
|
broker = createBroker(false);
|
||||||
|
broker.start();
|
||||||
|
|
||||||
consumerConnection = factory.createConnection();
|
consumerConnection = factory.createConnection();
|
||||||
consumerConnection.setClientID(CONSUMER_NAME);
|
consumerConnection.setClientID(CONSUMER_NAME);
|
||||||
|
consumerConnection.start();
|
||||||
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
|
consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
|
||||||
consumerConnection.start();
|
|
||||||
for (int i =0; i < COUNT;i++) {
|
for (int i =0; i < COUNT;i++) {
|
||||||
Message msg = consumer.receive(5000);
|
Message msg = consumer.receive(5000);
|
||||||
assertNotNull("Missing message: "+i, msg);
|
assertNotNull("Missing message: "+i, msg);
|
||||||
|
@ -342,17 +363,11 @@ public class DurableConsumerTest extends TestCase {
|
||||||
protected BrokerService createBroker(boolean deleteStore) throws Exception {
|
protected BrokerService createBroker(boolean deleteStore) throws Exception {
|
||||||
BrokerService answer = new BrokerService();
|
BrokerService answer = new BrokerService();
|
||||||
configureBroker(answer,deleteStore);
|
configureBroker(answer,deleteStore);
|
||||||
answer.start();
|
|
||||||
return answer;
|
return answer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception {
|
protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception {
|
||||||
File dataDirFile = new File("target/"+ getName());
|
|
||||||
AMQPersistenceAdapterFactory fact = new AMQPersistenceAdapterFactory();
|
|
||||||
fact.setDataDirectory(dataDirFile);
|
|
||||||
fact.setForceRecoverReferenceStore(true);
|
|
||||||
answer.setPersistenceAdapter(fact.createPersistenceAdapter());
|
|
||||||
answer.setDeleteAllMessagesOnStartup(deleteStore);
|
answer.setDeleteAllMessagesOnStartup(deleteStore);
|
||||||
answer.addConnector(bindAddress);
|
answer.addConnector(bindAddress);
|
||||||
answer.setUseShutdownHook(false);
|
answer.setUseShutdownHook(false);
|
||||||
|
|
Loading…
Reference in New Issue