https://issues.apache.org/jira/browse/AMQ-6286 - variant on test with n consumers sharing a session

This commit is contained in:
gtully 2016-05-13 15:50:13 +01:00
parent 3c0a4d960e
commit d7b5a62bb0
1 changed files with 27 additions and 6 deletions

View File

@ -47,15 +47,37 @@ public class QueueOrderSingleTransactedConsumerTest {
publishMessages(100); publishMessages(100);
consumeVerifyOrderAndRollback(20); consumeVerifyOrderRollback(20);
consumeVerifyOrderAndRollback(10); consumeVerifyOrderRollback(10);
consumeVerifyOrderAndRollback(5); consumeVerifyOrderRollback(5);
} }
private void consumeVerifyOrderAndRollback(final int num) throws Exception { @Test
public void testSingleSessionXConsumerTxRepeat() throws Exception {
publishMessages(100);
Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection(); Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
connection.start(); connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumeVerifyOrder(session, 20);
session.rollback();
consumeVerifyOrder(session, 10);
session.rollback();
consumeVerifyOrder(session, 5);
session.commit();
}
private void consumeVerifyOrderRollback(final int num) throws Exception {
Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumeVerifyOrder(session, num);
session.rollback();
connection.close();
}
private void consumeVerifyOrder(Session session, final int num) throws Exception {
MessageConsumer messageConsumer = session.createConsumer(dest); MessageConsumer messageConsumer = session.createConsumer(dest);
for (int i=0; i<num; ) { for (int i=0; i<num; ) {
Message message = messageConsumer.receive(4000); Message message = messageConsumer.receive(4000);
@ -64,8 +86,7 @@ public class QueueOrderSingleTransactedConsumerTest {
i++; i++;
} }
} }
session.rollback(); messageConsumer.close();
connection.close();
} }
private void publishMessages(int num) throws Exception { private void publishMessages(int num) throws Exception {