This closes #2207
This commit is contained in:
commit
f980c349a8
|
@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.Queue;
|
|||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -138,19 +139,7 @@ public class HeuristicXATest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
if (isCommit) {
|
||||
Assert.assertEquals(1, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
|
||||
|
||||
session = sf.createSession(false, false, false);
|
||||
|
||||
session.start();
|
||||
ClientConsumer consumer = session.createConsumer(ADDRESS);
|
||||
msg = consumer.receive(1000);
|
||||
Assert.assertNotNull(msg);
|
||||
msg.acknowledge();
|
||||
Assert.assertEquals(body, msg.getBodyBuffer().readString());
|
||||
|
||||
session.commit();
|
||||
session.close();
|
||||
assertMessageInQueueThenReceiveAndCheckContent(server, sf);
|
||||
}
|
||||
|
||||
Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
|
||||
|
@ -213,19 +202,7 @@ public class HeuristicXATest extends ActiveMQTestBase {
|
|||
Assert.assertEquals(0, preparedTransactions.length);
|
||||
|
||||
if (isCommit) {
|
||||
Assert.assertEquals(1, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
|
||||
|
||||
session = sf.createSession(false, false, false);
|
||||
|
||||
session.start();
|
||||
ClientConsumer consumer = session.createConsumer(ADDRESS);
|
||||
msg = consumer.receive(1000);
|
||||
Assert.assertNotNull(msg);
|
||||
msg.acknowledge();
|
||||
Assert.assertEquals(body, msg.getBodyBuffer().readString());
|
||||
|
||||
session.commit();
|
||||
session.close();
|
||||
assertMessageInQueueThenReceiveAndCheckContent(server, sf);
|
||||
}
|
||||
|
||||
Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
|
||||
|
@ -303,19 +280,7 @@ public class HeuristicXATest extends ActiveMQTestBase {
|
|||
Assert.assertEquals(0, preparedTransactions.length);
|
||||
|
||||
if (heuristicCommit) {
|
||||
Assert.assertEquals(1, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
|
||||
|
||||
session = sf.createSession(false, false, false);
|
||||
|
||||
session.start();
|
||||
ClientConsumer consumer = session.createConsumer(ADDRESS);
|
||||
msg = consumer.receive(1000);
|
||||
Assert.assertNotNull(msg);
|
||||
msg.acknowledge();
|
||||
Assert.assertEquals(body, msg.getBodyBuffer().readString());
|
||||
|
||||
session.commit();
|
||||
session.close();
|
||||
assertMessageInQueueThenReceiveAndCheckContent(server, sf);
|
||||
}
|
||||
|
||||
Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
|
||||
|
@ -345,6 +310,22 @@ public class HeuristicXATest extends ActiveMQTestBase {
|
|||
session.close();
|
||||
}
|
||||
|
||||
private void assertMessageInQueueThenReceiveAndCheckContent(ActiveMQServer server, ClientSessionFactory sf) throws Exception {
|
||||
Wait.assertEquals(1, () -> getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())), 5 * 1000, 100);
|
||||
|
||||
ClientSession session = sf.createSession(false, false, false);
|
||||
|
||||
session.start();
|
||||
ClientConsumer consumer = session.createConsumer(ADDRESS);
|
||||
ClientMessage msg = consumer.receive(1000);
|
||||
Assert.assertNotNull(msg);
|
||||
msg.acknowledge();
|
||||
Assert.assertEquals(body, msg.getBodyBuffer().readString());
|
||||
|
||||
session.commit();
|
||||
session.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testForgetHeuristicCommitAndRestart() throws Exception {
|
||||
doForgetHeuristicCompletedTxAndRestart(true);
|
||||
|
|
Loading…
Reference in New Issue