mirror of https://github.com/apache/activemq.git
resolve intermittent faiure - make deterministic intervetion and assertions
This commit is contained in:
parent
b53d8ea295
commit
4648dbee41
|
@ -79,6 +79,7 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
|
|||
// method that can be configured to throw a SQL exception on demand
|
||||
jdbc = new TestJDBCPersistenceAdapter();
|
||||
jdbc.setDataSource(embeddedDataSource);
|
||||
jdbc.setCleanupPeriod(0);
|
||||
testTransactionContext = new TestTransactionContext(jdbc);
|
||||
|
||||
jdbc.setLockKeepAlivePeriod(1000l);
|
||||
|
@ -121,15 +122,19 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
|
|||
|
||||
sendMessage(MY_TEST_Q, failoverTransportURL);
|
||||
|
||||
List<TextMessage> consumedMessages = consumeMessages(MY_TEST_Q,failoverTransportURL);
|
||||
|
||||
//check db contents
|
||||
ArrayList<Long> dbSeq = dbMessageCount();
|
||||
LOG.info("*** after send: db contains message seq " +dbSeq );
|
||||
|
||||
LOG.debug("*** db contains message seq " +dbSeq );
|
||||
List<TextMessage> consumedMessages = consumeMessages(MY_TEST_Q,failoverTransportURL);
|
||||
|
||||
assertEquals("number of consumed messages",3,consumedMessages.size());
|
||||
|
||||
//check db contents
|
||||
dbSeq = dbMessageCount();
|
||||
LOG.info("*** after consume - db contains message seq " + dbSeq);
|
||||
|
||||
assertEquals("number of messages in DB after test",0,dbSeq.size());
|
||||
assertEquals("number of consumed messages",3,consumedMessages.size());
|
||||
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
|
@ -158,7 +163,7 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
|
|||
MessageConsumer messageConsumer = session.createConsumer(destination);
|
||||
|
||||
while(true){
|
||||
TextMessage textMessage= (TextMessage) messageConsumer.receive(100);
|
||||
TextMessage textMessage= (TextMessage) messageConsumer.receive(4000);
|
||||
LOG.debug("*** consumed Messages :"+textMessage);
|
||||
|
||||
if(textMessage==null){
|
||||
|
@ -176,7 +181,7 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
|
|||
}
|
||||
|
||||
public void sendMessage(String queue, String transportURL)
|
||||
throws JMSException {
|
||||
throws Exception {
|
||||
Connection connection = null;
|
||||
|
||||
try {
|
||||
|
@ -193,32 +198,18 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
|
|||
|
||||
TextMessage m = session.createTextMessage("1");
|
||||
|
||||
testTransactionContext.throwSQLException = false;
|
||||
jdbc.throwSQLException = false;
|
||||
|
||||
|
||||
LOG.debug("*** send message 1 to broker...");
|
||||
producer.send(m);
|
||||
|
||||
testTransactionContext.throwSQLException = true;
|
||||
jdbc.throwSQLException = true;
|
||||
|
||||
// trigger SQL exception in transactionContext
|
||||
LOG.debug("*** send message 2 to broker");
|
||||
m.setText("2");
|
||||
|
||||
// need to reset the flag in a seperate thread during the send
|
||||
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
testTransactionContext.throwSQLException = false;
|
||||
jdbc.throwSQLException = false;
|
||||
}
|
||||
}, 2 , TimeUnit.SECONDS);
|
||||
|
||||
producer.send(m);
|
||||
|
||||
//check db contents
|
||||
ArrayList<Long> dbSeq = dbMessageCount();
|
||||
LOG.info("*** after send 2 - db contains message seq " + dbSeq);
|
||||
assertEquals("number of messages in DB after send 2",2,dbSeq.size());
|
||||
|
||||
LOG.debug("*** send message 3 to broker");
|
||||
m.setText("3");
|
||||
|
@ -266,25 +257,14 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
|
|||
*/
|
||||
|
||||
public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
|
||||
|
||||
public boolean throwSQLException;
|
||||
|
||||
public TransactionContext getTransactionContext() throws IOException {
|
||||
return testTransactionContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkpoint(boolean sync) throws IOException {
|
||||
if (throwSQLException) {
|
||||
throw new IOException("checkpoint failed");
|
||||
}
|
||||
super.checkpoint(sync);
|
||||
}
|
||||
}
|
||||
|
||||
public class TestTransactionContext extends TransactionContext {
|
||||
|
||||
public boolean throwSQLException;
|
||||
private int count;
|
||||
|
||||
public TestTransactionContext(
|
||||
JDBCPersistenceAdapter jdbcPersistenceAdapter)
|
||||
|
@ -293,14 +273,14 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
|
|||
}
|
||||
|
||||
public void executeBatch() throws SQLException {
|
||||
//call
|
||||
super.executeBatch();
|
||||
count++;
|
||||
LOG.debug("ExecuteBatchOverride: count:" + count, new RuntimeException("executeBatch"));
|
||||
|
||||
if (throwSQLException){
|
||||
throw new SQLException("TEST SQL EXCEPTION from executeBatch after super. execution");
|
||||
// throw on second add message
|
||||
if (count == 16){
|
||||
throw new SQLException("TEST SQL EXCEPTION from executeBatch after super.execution: count:" + count);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue