fix failing RecoverExpiredMessagesTest - remove duplicate ref to secedualler task in jdbc pa inheritance and fix persistence adapter ref in test

This commit is contained in:
gtully 2014-12-08 13:59:38 +00:00
parent 919099b419
commit bc3587cfc9
4 changed files with 15 additions and 13 deletions

View File

@ -39,7 +39,7 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L
Locker locker;
long lockKeepAlivePeriod = 0;
private ScheduledFuture<?> keepAliveTicket;
private ScheduledThreadPoolExecutor clockDaemon;
protected ScheduledThreadPoolExecutor clockDaemon;
protected BrokerService brokerService;
/**
@ -113,8 +113,9 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L
if (locker != null) {
getLocker().stop();
}
ThreadPoolUtils.shutdown(clockDaemon);
}
ThreadPoolUtils.shutdown(clockDaemon);
clockDaemon = null;
}
protected void keepLockAlive() {
@ -162,6 +163,10 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L
return clockDaemon;
}
public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
this.clockDaemon = clockDaemon;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;

View File

@ -85,7 +85,6 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
private Statements statements;
private JDBCAdapter adapter;
private MemoryTransactionStore transactionStore;
private ScheduledThreadPoolExecutor clockDaemon;
private ScheduledFuture<?> cleanupTicket;
private int cleanupPeriod = 1000 * 60 * 5;
private boolean useExternalMessageReferences;
@ -337,8 +336,6 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
cleanupTicket.cancel(true);
cleanupTicket = null;
}
ThreadPoolUtils.shutdown(clockDaemon);
}
public void cleanup() {
@ -363,10 +360,6 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
}
public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
this.clockDaemon = clockDaemon;
}
@Override
public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
if (clockDaemon == null) {

View File

@ -63,12 +63,12 @@ public class BrokerTestSupport extends CombinationTestSupport {
public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true");
protected RegionBroker regionBroker;
protected BrokerService broker;
public BrokerService broker;
protected long idGenerator;
protected int msgIdGenerator;
protected int txGenerator;
protected int tempDestGenerator;
protected PersistenceAdapter persistenceAdapter;
public PersistenceAdapter persistenceAdapter;
protected String queueName = "TEST";

View File

@ -34,15 +34,16 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.IOHelper;
public class RecoverExpiredMessagesTest extends BrokerRestartTestSupport {
final ArrayList<String> expected = new ArrayList<String>();
final ActiveMQDestination destination = new ActiveMQQueue("TEST");
public PendingQueueMessageStoragePolicy queuePendingPolicy;
public PersistenceAdapter persistenceAdapter;
@Override
protected void setUp() throws Exception {
@ -52,13 +53,16 @@ public class RecoverExpiredMessagesTest extends BrokerRestartTestSupport {
public void initCombosForTestRecovery() throws Exception {
addCombinationValues("queuePendingPolicy", new PendingQueueMessageStoragePolicy[] {new FilePendingQueueMessageStoragePolicy(), new VMPendingQueueMessageStoragePolicy()});
addCombinationValues("persistenceAdapter", new PersistenceAdapter[] {new KahaDBPersistenceAdapter(), new JDBCPersistenceAdapter()});
addCombinationValues("persistenceAdapter", new PersistenceAdapter[] {new KahaDBPersistenceAdapter(),
// need to supply the dataSource as it is used in parameter matching via the toString
new JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat())});
}
public void testRecovery() throws Exception {
sendSomeMessagesThatWillExpireIn5AndThenOne();
broker.stop();
broker.waitUntilStopped();
TimeUnit.SECONDS.sleep(6);
broker = createRestartedBroker();
broker.start();