AMQ-6086 - variant with broker.schedulerSupport=true; same fix with additional test

This commit is contained in:
gtully 2018-09-25 16:29:24 +01:00
parent 02c1e6d8f2
commit cae66f5d37
2 changed files with 71 additions and 2 deletions

View File

@ -1768,7 +1768,7 @@ public class BrokerService implements Service {
* @return the tempDataStore
*/
public synchronized PListStore getTempDataStore() {
if (tempDataStore == null) {
if (tempDataStore == null && !hasStartException()) {
if (!isPersistent()) {
return null;
}
@ -1931,7 +1931,7 @@ public class BrokerService implements Service {
// If the user configured their own we use it even if persistence is disabled since
// we don't know anything about their implementation.
if (jobSchedulerStore == null) {
if (jobSchedulerStore == null && !hasStartException()) {
if (!isPersistent()) {
this.jobSchedulerStore = new InMemoryJobSchedulerStore();

View File

@ -49,6 +49,9 @@ import javax.management.loading.ClassLoaderRepository;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerStoppedException;
import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.ServiceSupport;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -367,4 +370,70 @@ public class StartAndConcurrentStopBrokerTest {
}
@Test(timeout = 30000)
public void testStopWithScheduler() throws Exception {
final BrokerService brokerService = new BrokerService();
brokerService.setSchedulerSupport(true);
ExecutorService executor = Executors.newFixedThreadPool(1);
for (int sleepT : new int[] {30, 40, 50}) {
final int sleepMillis = sleepT;
final CountDownLatch stopperStarted = new CountDownLatch(1);
final CountDownLatch stopperDone = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override
public void run() {
try {
stopperStarted.countDown();
int timeOutMillis = 4000;
PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter();
if (persistenceAdapter != null && persistenceAdapter instanceof LockableServiceSupport) {
LockableServiceSupport lockableServiceSupport = (LockableServiceSupport) persistenceAdapter;
if (lockableServiceSupport.isUseLock() && lockableServiceSupport.getLocker() instanceof ServiceSupport) {
ServiceSupport lockService = ((ServiceSupport) lockableServiceSupport.getLocker());
while (!lockService.isStarted() && timeOutMillis > 0) {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
timeOutMillis = timeOutMillis - sleepMillis;
}
LOG.info("broker stop on active async start, blocked on store lock:" + lockService.isStarted());
brokerService.stop();
}
}
stopperDone.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
});
assertTrue("Checker started", stopperStarted.await(20, TimeUnit.SECONDS));
try {
brokerService.start();
// Expect start to fail with brokerStopped Exception but not a guarantee
} catch (BrokerStoppedException expected) {
expected.printStackTrace();
} finally {
brokerService.stop();
}
assertTrue("Stopper done", stopperDone.await(20, TimeUnit.SECONDS));
// ok to start a new
BrokerService sanity = new BrokerService();
sanity.setSchedulerSupport(true);
sanity.start();
sanity.stop();
}
}
}