This closes #925
This commit is contained in:
commit
ce4f9b5ff6
|
@ -385,6 +385,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
|||
LOG.info("Received: " + msg);
|
||||
Assert.assertNull("no messges left dangling but got: " + msg, msg);
|
||||
connection.close();
|
||||
proxy.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -857,6 +858,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
|||
broker.stop();
|
||||
broker = createBroker();
|
||||
broker.start();
|
||||
|
||||
Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
|
||||
} finally {
|
||||
connection.close();
|
||||
|
@ -872,45 +874,57 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
|||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
|
||||
configureConnectionFactory(cf);
|
||||
Connection connection = cf.createConnection();
|
||||
connection.start();
|
||||
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
final Queue destination = producerSession.createQueue(QUEUE_NAME);
|
||||
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
MessageConsumer consumer = consumerSession.createConsumer(destination);
|
||||
try {
|
||||
connection.start();
|
||||
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
final Queue destination = producerSession.createQueue(QUEUE_NAME);
|
||||
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
MessageConsumer consumer = consumerSession.createConsumer(destination);
|
||||
|
||||
produceMessage(producerSession, destination);
|
||||
Message msg = consumer.receive(20000);
|
||||
if (msg == null) {
|
||||
AutoFailTestSupport.dumpAllThreads("missing-");
|
||||
}
|
||||
Assert.assertNotNull("got message just produced", msg);
|
||||
|
||||
broker.stop();
|
||||
broker = createBroker();
|
||||
broker.start();
|
||||
|
||||
final CountDownLatch commitDone = new CountDownLatch(1);
|
||||
// will block pending re-deliveries
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("doing async commit...");
|
||||
try {
|
||||
consumerSession.commit();
|
||||
commitDone.countDown();
|
||||
} catch (JMSException ignored) {
|
||||
}
|
||||
produceMessage(producerSession, destination);
|
||||
Message msg = consumer.receive(20000);
|
||||
if (msg == null) {
|
||||
AutoFailTestSupport.dumpAllThreads("missing-");
|
||||
}
|
||||
}.start();
|
||||
Assert.assertNotNull("got message just produced", msg);
|
||||
|
||||
broker.stop();
|
||||
broker = createBroker();
|
||||
broker.start();
|
||||
broker.stop();
|
||||
broker = createBroker();
|
||||
broker.start();
|
||||
|
||||
Assert.assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
|
||||
final CountDownLatch commitDone = new CountDownLatch(1);
|
||||
final CountDownLatch gotException = new CountDownLatch(1);
|
||||
// will block pending re-deliveries
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("doing async commit...");
|
||||
try {
|
||||
consumerSession.commit();
|
||||
commitDone.countDown();
|
||||
}
|
||||
catch (JMSException ignored) {
|
||||
System.out.println("--->err: got exfeption:");
|
||||
ignored.printStackTrace();
|
||||
gotException.countDown();
|
||||
}
|
||||
finally {
|
||||
commitDone.countDown();
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
|
||||
Assert.assertNull("should not get committed message", consumer.receive(5000));
|
||||
connection.close();
|
||||
broker.stop();
|
||||
broker = createBroker();
|
||||
broker.start();
|
||||
|
||||
Assert.assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
|
||||
Assert.assertTrue("got exception on commit", gotException.await(30, TimeUnit.SECONDS));
|
||||
|
||||
Assert.assertNotNull("should get failed committed message", consumer.receive(5000));
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue