diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java index 0d14135edc..688d066d2a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java @@ -79,6 +79,7 @@ public class TrapMessageInJDBCStoreTest extends TestCase { // method that can be configured to throw a SQL exception on demand jdbc = new TestJDBCPersistenceAdapter(); jdbc.setDataSource(embeddedDataSource); + jdbc.setCleanupPeriod(0); testTransactionContext = new TestTransactionContext(jdbc); jdbc.setLockKeepAlivePeriod(1000l); @@ -121,15 +122,19 @@ public class TrapMessageInJDBCStoreTest extends TestCase { sendMessage(MY_TEST_Q, failoverTransportURL); - List consumedMessages = consumeMessages(MY_TEST_Q,failoverTransportURL); - //check db contents ArrayList dbSeq = dbMessageCount(); + LOG.info("*** after send: db contains message seq " +dbSeq ); - LOG.debug("*** db contains message seq " +dbSeq ); + List consumedMessages = consumeMessages(MY_TEST_Q,failoverTransportURL); + + assertEquals("number of consumed messages",3,consumedMessages.size()); + + //check db contents + dbSeq = dbMessageCount(); + LOG.info("*** after consume - db contains message seq " + dbSeq); assertEquals("number of messages in DB after test",0,dbSeq.size()); - assertEquals("number of consumed messages",3,consumedMessages.size()); broker.stop(); broker.waitUntilStopped(); @@ -158,7 +163,7 @@ public class TrapMessageInJDBCStoreTest extends TestCase { MessageConsumer messageConsumer = session.createConsumer(destination); while(true){ - TextMessage textMessage= (TextMessage) messageConsumer.receive(100); + TextMessage textMessage= (TextMessage) messageConsumer.receive(4000); LOG.debug("*** consumed Messages :"+textMessage); if(textMessage==null){ @@ -176,7 +181,7 @@ public class TrapMessageInJDBCStoreTest extends TestCase { } public void sendMessage(String queue, String transportURL) - throws JMSException { + throws Exception { Connection connection = null; try { @@ -193,32 +198,18 @@ public class TrapMessageInJDBCStoreTest extends TestCase { TextMessage m = session.createTextMessage("1"); - testTransactionContext.throwSQLException = false; - jdbc.throwSQLException = false; - - LOG.debug("*** send message 1 to broker..."); producer.send(m); - testTransactionContext.throwSQLException = true; - jdbc.throwSQLException = true; - // trigger SQL exception in transactionContext LOG.debug("*** send message 2 to broker"); m.setText("2"); + producer.send(m); - // need to reset the flag in a seperate thread during the send - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - executor.schedule(new Runnable() { - @Override - public void run() { - testTransactionContext.throwSQLException = false; - jdbc.throwSQLException = false; - } - }, 2 , TimeUnit.SECONDS); - - producer.send(m); - + //check db contents + ArrayList dbSeq = dbMessageCount(); + LOG.info("*** after send 2 - db contains message seq " + dbSeq); + assertEquals("number of messages in DB after send 2",2,dbSeq.size()); LOG.debug("*** send message 3 to broker"); m.setText("3"); @@ -266,25 +257,14 @@ public class TrapMessageInJDBCStoreTest extends TestCase { */ public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter { - - public boolean throwSQLException; - public TransactionContext getTransactionContext() throws IOException { return testTransactionContext; } - - @Override - public void checkpoint(boolean sync) throws IOException { - if (throwSQLException) { - throw new IOException("checkpoint failed"); - } - super.checkpoint(sync); - } } public class TestTransactionContext extends TransactionContext { - public boolean throwSQLException; + private int count; public TestTransactionContext( JDBCPersistenceAdapter jdbcPersistenceAdapter) @@ -293,14 +273,14 @@ public class TrapMessageInJDBCStoreTest extends TestCase { } public void executeBatch() throws SQLException { - //call super.executeBatch(); + count++; + LOG.debug("ExecuteBatchOverride: count:" + count, new RuntimeException("executeBatch")); - if (throwSQLException){ - throw new SQLException("TEST SQL EXCEPTION from executeBatch after super. execution"); + // throw on second add message + if (count == 16){ + throw new SQLException("TEST SQL EXCEPTION from executeBatch after super.execution: count:" + count); } - - }