diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/model/api/ISmartLifecyclePhase.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/model/api/ISmartLifecyclePhase.java new file mode 100644 index 00000000000..00fd2b60510 --- /dev/null +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/model/api/ISmartLifecyclePhase.java @@ -0,0 +1,10 @@ +package ca.uhn.fhir.model.api; + +public interface ISmartLifecyclePhase { + // POST_CONSTRUCT is here as a marker for where post-construct fits into the smart lifecycle. Beans with negative phases + // will be started before @PostConstruct are called + int POST_CONSTRUCT = 0; + + // We want to start scheduled tasks fairly late in the startup process + int SCHEDULER_1000 = 1000; +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java index 987936d8677..aae0c2eb27b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java @@ -25,13 +25,13 @@ import ca.uhn.fhir.jpa.dao.DaoRegistry; import ca.uhn.fhir.jpa.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.ISearchBuilder; -import ca.uhn.fhir.jpa.model.cross.ResourcePersistentId; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; +import ca.uhn.fhir.jpa.model.cross.ResourcePersistentId; import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; @@ -319,7 +319,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); jobDetail.setId(BulkDataExportSvcImpl.class.getName()); jobDetail.setJobClass(BulkDataExportSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(REFRESH_INTERVAL, true, jobDetail); + mySchedulerService.scheduleFixedDelayClustered(REFRESH_INTERVAL, jobDetail); } @Transactional diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/NullScheduler.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/NullScheduler.java new file mode 100644 index 00000000000..ef207449b4f --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/NullScheduler.java @@ -0,0 +1,297 @@ +package ca.uhn.fhir.jpa.sched; + +import org.quartz.*; +import org.quartz.impl.matchers.GroupMatcher; +import org.quartz.spi.JobFactory; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; + +class NullScheduler implements Scheduler { + @Override + public String getSchedulerName() { + return null; + } + + @Override + public String getSchedulerInstanceId() { + return null; + } + + @Override + public SchedulerContext getContext() { + return null; + } + + @Override + public void start() { + + } + + @Override + public void startDelayed(int seconds) { + + } + + @Override + public boolean isStarted() { + return false; + } + + @Override + public void standby() { + + } + + @Override + public boolean isInStandbyMode() { + return false; + } + + @Override + public void shutdown() { + + } + + @Override + public void shutdown(boolean waitForJobsToComplete) { + + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public SchedulerMetaData getMetaData() { + return null; + } + + @Override + public List getCurrentlyExecutingJobs() { + return null; + } + + @Override + public void setJobFactory(JobFactory factory) { + + } + + @Override + public ListenerManager getListenerManager() { + return null; + } + + @Override + public Date scheduleJob(JobDetail jobDetail, Trigger trigger) { + return null; + } + + @Override + public Date scheduleJob(Trigger trigger) { + return null; + } + + @Override + public void scheduleJobs(Map> triggersAndJobs, boolean replace) { + + } + + @Override + public void scheduleJob(JobDetail jobDetail, Set triggersForJob, boolean replace) { + + } + + @Override + public boolean unscheduleJob(TriggerKey triggerKey) { + return false; + } + + @Override + public boolean unscheduleJobs(List triggerKeys) { + return false; + } + + @Override + public Date rescheduleJob(TriggerKey triggerKey, Trigger newTrigger) { + return null; + } + + @Override + public void addJob(JobDetail jobDetail, boolean replace) { + + } + + @Override + public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) { + + } + + @Override + public boolean deleteJob(JobKey jobKey) { + return false; + } + + @Override + public boolean deleteJobs(List jobKeys) { + return false; + } + + @Override + public void triggerJob(JobKey jobKey) { + + } + + @Override + public void triggerJob(JobKey jobKey, JobDataMap data) { + + } + + @Override + public void pauseJob(JobKey jobKey) { + + } + + @Override + public void pauseJobs(GroupMatcher matcher) { + + } + + @Override + public void pauseTrigger(TriggerKey triggerKey) { + + } + + @Override + public void pauseTriggers(GroupMatcher matcher) { + + } + + @Override + public void resumeJob(JobKey jobKey) { + + } + + @Override + public void resumeJobs(GroupMatcher matcher) { + + } + + @Override + public void resumeTrigger(TriggerKey triggerKey) { + + } + + @Override + public void resumeTriggers(GroupMatcher matcher) { + + } + + @Override + public void pauseAll() { + + } + + @Override + public void resumeAll() { + + } + + @Override + public List getJobGroupNames() { + return null; + } + + @Override + public Set getJobKeys(GroupMatcher matcher) { + return null; + } + + @Override + public List getTriggersOfJob(JobKey jobKey) { + return null; + } + + @Override + public List getTriggerGroupNames() { + return null; + } + + @Override + public Set getTriggerKeys(GroupMatcher matcher) { + return null; + } + + @Override + public Set getPausedTriggerGroups() { + return null; + } + + @Override + public JobDetail getJobDetail(JobKey jobKey) { + return null; + } + + @Override + public Trigger getTrigger(TriggerKey triggerKey) { + return null; + } + + @Override + public Trigger.TriggerState getTriggerState(TriggerKey triggerKey) { + return null; + } + + @Override + public void resetTriggerFromErrorState(TriggerKey triggerKey) { + + } + + @Override + public void addCalendar(String calName, Calendar calendar, boolean replace, boolean updateTriggers) { + + } + + @Override + public boolean deleteCalendar(String calName) { + return false; + } + + @Override + public Calendar getCalendar(String calName) { + return null; + } + + @Override + public List getCalendarNames() { + return null; + } + + @Override + public boolean interrupt(JobKey jobKey) throws UnableToInterruptJobException { + return false; + } + + @Override + public boolean interrupt(String fireInstanceId) throws UnableToInterruptJobException { + return false; + } + + @Override + public boolean checkExists(JobKey jobKey) { + return false; + } + + @Override + public boolean checkExists(TriggerKey triggerKey) { + return false; + } + + @Override + public void clear() { + + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImpl.java index b996f1f600d..d97a47e0a57 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImpl.java @@ -20,28 +20,29 @@ package ca.uhn.fhir.jpa.sched; * #L% */ +import ca.uhn.fhir.context.ConfigurationException; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; +import ca.uhn.fhir.model.api.ISmartLifecyclePhase; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import com.google.common.collect.Sets; import org.apache.commons.lang3.Validate; -import org.quartz.Calendar; import org.quartz.*; import org.quartz.impl.JobDetailImpl; import org.quartz.impl.StdSchedulerFactory; import org.quartz.impl.matchers.GroupMatcher; -import org.quartz.spi.JobFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.event.ContextRefreshedEvent; -import org.springframework.context.event.EventListener; +import org.springframework.context.ApplicationContext; +import org.springframework.context.SmartLifecycle; import org.springframework.core.env.Environment; import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.*; +import java.util.Properties; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME; @@ -63,22 +64,24 @@ import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME; * * */ -public class SchedulerServiceImpl implements ISchedulerService { +public class SchedulerServiceImpl implements ISchedulerService, SmartLifecycle { public static final String SCHEDULING_DISABLED = "scheduling_disabled"; public static final String SCHEDULING_DISABLED_EQUALS_TRUE = SCHEDULING_DISABLED + "=true"; private static final Logger ourLog = LoggerFactory.getLogger(SchedulerServiceImpl.class); - private static int ourNextSchedulerId = 0; + private static AtomicInteger ourNextSchedulerId = new AtomicInteger(); private Scheduler myLocalScheduler; private Scheduler myClusteredScheduler; private String myThreadNamePrefix; private boolean myLocalSchedulingEnabled; private boolean myClusteredSchedulingEnabled; - @Autowired - private AutowiringSpringBeanJobFactory mySpringBeanJobFactory; private AtomicBoolean myStopping = new AtomicBoolean(false); @Autowired + private AutowiringSpringBeanJobFactory mySpringBeanJobFactory; + @Autowired private Environment myEnvironment; + @Autowired + private ApplicationContext myApplicationContext; /** * Constructor @@ -114,38 +117,18 @@ public class SchedulerServiceImpl implements ISchedulerService { } @PostConstruct - public void start() throws SchedulerException { + public void create() throws SchedulerException { myLocalScheduler = createLocalScheduler(); myClusteredScheduler = createClusteredScheduler(); myStopping.set(false); } - /** - * We defer startup of executing started tasks until we're sure we're ready for it - * and the startup is completely done - */ - @EventListener - public void contextStarted(ContextRefreshedEvent theEvent) throws SchedulerException { - try { - ourLog.info("Starting task schedulers for context {}", theEvent != null ? theEvent.getApplicationContext().getId() : "null"); - if (myLocalScheduler != null) { - myLocalScheduler.start(); - } - if (myClusteredScheduler != null) { - myClusteredScheduler.start(); - } - } catch (Exception e) { - ourLog.error("Failed to start context", e); - throw new SchedulerException(e); - } - } - private Scheduler createLocalScheduler() throws SchedulerException { if (!isLocalSchedulingEnabled() || isSchedulingDisabledForUnitTests()) { return new NullScheduler(); } Properties localProperties = new Properties(); - localProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "local-" + ourNextSchedulerId++); + localProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "local-" + ourNextSchedulerId.getAndIncrement()); quartzPropertiesCommon(localProperties); quartzPropertiesLocal(localProperties); StdSchedulerFactory factory = new StdSchedulerFactory(); @@ -161,7 +144,7 @@ public class SchedulerServiceImpl implements ISchedulerService { return new NullScheduler(); } Properties clusteredProperties = new Properties(); - clusteredProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "clustered-" + ourNextSchedulerId++); + clusteredProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "clustered-" + ourNextSchedulerId.getAndIncrement()); quartzPropertiesCommon(clusteredProperties); quartzPropertiesClustered(clusteredProperties); StdSchedulerFactory factory = new StdSchedulerFactory(); @@ -176,13 +159,54 @@ public class SchedulerServiceImpl implements ISchedulerService { theScheduler.setJobFactory(mySpringBeanJobFactory); } - @PreDestroy - public void stop() throws SchedulerException { + /** + * We defer startup of executing started tasks until we're sure we're ready for it + * and the startup is completely done + */ + + @Override + public int getPhase() { + return ISmartLifecyclePhase.SCHEDULER_1000; + } + + @Override + public void start() { + try { + ourLog.info("Starting task schedulers for context {}", myApplicationContext.getId()); + if (myLocalScheduler != null) { + myLocalScheduler.start(); + } + if (myClusteredScheduler != null) { + myClusteredScheduler.start(); + } + } catch (Exception e) { + ourLog.error("Failed to start scheduler", e); + throw new ConfigurationException("Failed to start scheduler", e); + } + } + + @Override + public void stop() { ourLog.info("Shutting down task scheduler..."); myStopping.set(true); - myLocalScheduler.shutdown(true); - myClusteredScheduler.shutdown(true); + try { + myLocalScheduler.shutdown(true); + myClusteredScheduler.shutdown(true); + } catch (SchedulerException e) { + ourLog.error("Failed to shut down scheduler"); + throw new ConfigurationException("Failed to shut down scheduler", e); + } + } + + @Override + public boolean isRunning() { + try { + return !myStopping.get() && myLocalScheduler.isStarted() && myClusteredScheduler.isStarted(); + } catch (SchedulerException e) { + ourLog.error("Failed to determine scheduler status", e); + return false; + } } @Override @@ -192,7 +216,7 @@ public class SchedulerServiceImpl implements ISchedulerService { } @Override - public void logStatus() { + public void logStatusForUnitTest() { try { Set keys = myLocalScheduler.getJobKeys(GroupMatcher.anyGroup()); String keysString = keys.stream().map(t -> t.getName()).collect(Collectors.joining(", ")); @@ -207,7 +231,16 @@ public class SchedulerServiceImpl implements ISchedulerService { } @Override - public void scheduleFixedDelay(long theIntervalMillis, boolean theClusteredTask, ScheduledJobDefinition theJobDefinition) { + public void scheduleFixedDelayLocal(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) { + scheduleFixedDelay(theIntervalMillis, myLocalScheduler, theJobDefinition); + } + + @Override + public void scheduleFixedDelayClustered(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) { + scheduleFixedDelay(theIntervalMillis, myClusteredScheduler, theJobDefinition); + } + + private void scheduleFixedDelay(long theIntervalMillis, Scheduler theScheduler, ScheduledJobDefinition theJobDefinition) { Validate.isTrue(theIntervalMillis >= 100); Validate.notNull(theJobDefinition); @@ -235,13 +268,7 @@ public class SchedulerServiceImpl implements ISchedulerService { Set triggers = Sets.newHashSet(trigger); try { - Scheduler scheduler; - if (theClusteredTask) { - scheduler = myClusteredScheduler; - } else { - scheduler = myLocalScheduler; - } - scheduler.scheduleJob(jobDetail, triggers, true); + theScheduler.scheduleJob(jobDetail, triggers, true); } catch (SchedulerException e) { ourLog.error("Failed to schedule job", e); throw new InternalErrorException(e); @@ -288,292 +315,5 @@ public class SchedulerServiceImpl implements ISchedulerService { } } - private static class NullScheduler implements Scheduler { - @Override - public String getSchedulerName() { - return null; - } - - @Override - public String getSchedulerInstanceId() { - return null; - } - - @Override - public SchedulerContext getContext() { - return null; - } - - @Override - public void start() { - - } - - @Override - public void startDelayed(int seconds) { - - } - - @Override - public boolean isStarted() { - return false; - } - - @Override - public void standby() { - - } - - @Override - public boolean isInStandbyMode() { - return false; - } - - @Override - public void shutdown() { - - } - - @Override - public void shutdown(boolean waitForJobsToComplete) { - - } - - @Override - public boolean isShutdown() { - return false; - } - - @Override - public SchedulerMetaData getMetaData() { - return null; - } - - @Override - public List getCurrentlyExecutingJobs() { - return null; - } - - @Override - public void setJobFactory(JobFactory factory) { - - } - - @Override - public ListenerManager getListenerManager() { - return null; - } - - @Override - public Date scheduleJob(JobDetail jobDetail, Trigger trigger) { - return null; - } - - @Override - public Date scheduleJob(Trigger trigger) { - return null; - } - - @Override - public void scheduleJobs(Map> triggersAndJobs, boolean replace) { - - } - - @Override - public void scheduleJob(JobDetail jobDetail, Set triggersForJob, boolean replace) { - - } - - @Override - public boolean unscheduleJob(TriggerKey triggerKey) { - return false; - } - - @Override - public boolean unscheduleJobs(List triggerKeys) { - return false; - } - - @Override - public Date rescheduleJob(TriggerKey triggerKey, Trigger newTrigger) { - return null; - } - - @Override - public void addJob(JobDetail jobDetail, boolean replace) { - - } - - @Override - public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) { - - } - - @Override - public boolean deleteJob(JobKey jobKey) { - return false; - } - - @Override - public boolean deleteJobs(List jobKeys) { - return false; - } - - @Override - public void triggerJob(JobKey jobKey) { - - } - - @Override - public void triggerJob(JobKey jobKey, JobDataMap data) { - - } - - @Override - public void pauseJob(JobKey jobKey) { - - } - - @Override - public void pauseJobs(GroupMatcher matcher) { - - } - - @Override - public void pauseTrigger(TriggerKey triggerKey) { - - } - - @Override - public void pauseTriggers(GroupMatcher matcher) { - - } - - @Override - public void resumeJob(JobKey jobKey) { - - } - - @Override - public void resumeJobs(GroupMatcher matcher) { - - } - - @Override - public void resumeTrigger(TriggerKey triggerKey) { - - } - - @Override - public void resumeTriggers(GroupMatcher matcher) { - - } - - @Override - public void pauseAll() { - - } - - @Override - public void resumeAll() { - - } - - @Override - public List getJobGroupNames() { - return null; - } - - @Override - public Set getJobKeys(GroupMatcher matcher) { - return null; - } - - @Override - public List getTriggersOfJob(JobKey jobKey) { - return null; - } - - @Override - public List getTriggerGroupNames() { - return null; - } - - @Override - public Set getTriggerKeys(GroupMatcher matcher) { - return null; - } - - @Override - public Set getPausedTriggerGroups() { - return null; - } - - @Override - public JobDetail getJobDetail(JobKey jobKey) { - return null; - } - - @Override - public Trigger getTrigger(TriggerKey triggerKey) { - return null; - } - - @Override - public Trigger.TriggerState getTriggerState(TriggerKey triggerKey) { - return null; - } - - @Override - public void resetTriggerFromErrorState(TriggerKey triggerKey) { - - } - - @Override - public void addCalendar(String calName, Calendar calendar, boolean replace, boolean updateTriggers) { - - } - - @Override - public boolean deleteCalendar(String calName) { - return false; - } - - @Override - public Calendar getCalendar(String calName) { - return null; - } - - @Override - public List getCalendarNames() { - return null; - } - - @Override - public boolean interrupt(JobKey jobKey) throws UnableToInterruptJobException { - return false; - } - - @Override - public boolean interrupt(String fireInstanceId) throws UnableToInterruptJobException { - return false; - } - - @Override - public boolean checkExists(JobKey jobKey) { - return false; - } - - @Override - public boolean checkExists(TriggerKey triggerKey) { - return false; - } - - @Override - public void clear() { - - } - } - } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/StaleSearchDeletingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/StaleSearchDeletingSvcImpl.java index af25e9a0eea..e69bbc68b08 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/StaleSearchDeletingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/StaleSearchDeletingSvcImpl.java @@ -62,7 +62,7 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); jobDetail.setId(StaleSearchDeletingSvcImpl.class.getName()); jobDetail.setJobClass(StaleSearchDeletingSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(DEFAULT_CUTOFF_SLACK, true, jobDetail); + mySchedulerService.scheduleFixedDelayClustered(DEFAULT_CUTOFF_SLACK, jobDetail); } @Transactional(propagation = Propagation.NEVER) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/BaseSearchCacheSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/BaseSearchCacheSvcImpl.java index dfa11bfaa0a..d5d13d8b69b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/BaseSearchCacheSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/BaseSearchCacheSvcImpl.java @@ -55,7 +55,7 @@ public abstract class BaseSearchCacheSvcImpl implements ISearchCacheSvc { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); jobDetail.setId(BaseSearchCacheSvcImpl.class.getName()); jobDetail.setJobClass(BaseSearchCacheSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_SECOND, false, jobDetail); + mySchedulerService.scheduleFixedDelayLocal(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); } @Override diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/reindex/ResourceReindexingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/reindex/ResourceReindexingSvcImpl.java index ef98b9dbdca..cfe624d3ad5 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/reindex/ResourceReindexingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/reindex/ResourceReindexingSvcImpl.java @@ -192,7 +192,7 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); jobDetail.setId(ResourceReindexingSvcImpl.class.getName()); jobDetail.setJobClass(ResourceReindexingSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_SECOND, true, jobDetail); + mySchedulerService.scheduleFixedDelayClustered(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); } @VisibleForTesting diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java index 9cbf950be53..7df6430098e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java @@ -88,7 +88,7 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); jobDetail.setId(CacheWarmingSvcImpl.class.getName()); jobDetail.setJobClass(CacheWarmingSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(SCHEDULED_JOB_INTERVAL, true, jobDetail); + mySchedulerService.scheduleFixedDelayClustered(SCHEDULED_JOB_INTERVAL, jobDetail); } private void refreshNow(WarmCacheEntry theCacheEntry) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java index 756ad775c24..04ce27d09a5 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java @@ -164,7 +164,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); jobDetail.setId(SubscriptionTriggeringSvcImpl.class.getName()); jobDetail.setJobClass(SubscriptionTriggeringSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(SCHEDULE_DELAY, false, jobDetail); + mySchedulerService.scheduleFixedDelayLocal(SCHEDULE_DELAY, jobDetail); } @Override diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java index c1abe4308d1..b6ef5ffb835 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java @@ -23,13 +23,8 @@ package ca.uhn.fhir.jpa.term; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.support.IContextValidationSupport; -import ca.uhn.fhir.jpa.dao.DaoConfig; -import ca.uhn.fhir.jpa.dao.DaoRegistry; -import ca.uhn.fhir.jpa.dao.IDao; -import ca.uhn.fhir.jpa.dao.IFhirResourceDaoCodeSystem; -import ca.uhn.fhir.jpa.dao.IFhirResourceDaoValueSet; +import ca.uhn.fhir.jpa.dao.*; import ca.uhn.fhir.jpa.dao.IFhirResourceDaoValueSet.ValidateCodeResult; -import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc; import ca.uhn.fhir.jpa.dao.data.*; import ca.uhn.fhir.jpa.entity.*; import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink.RelationshipTypeEnum; @@ -60,11 +55,7 @@ import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.time.DateUtils; import org.apache.lucene.index.Term; import org.apache.lucene.queries.TermsQuery; -import org.apache.lucene.search.BooleanClause; -import org.apache.lucene.search.BooleanQuery; -import org.apache.lucene.search.MultiPhraseQuery; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.RegexpQuery; +import org.apache.lucene.search.*; import org.hibernate.ScrollMode; import org.hibernate.ScrollableResults; import org.hibernate.search.jpa.FullTextEntityManager; @@ -101,21 +92,14 @@ import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; import javax.persistence.PersistenceContextType; import javax.persistence.TypedQuery; -import javax.persistence.criteria.CriteriaBuilder; -import javax.persistence.criteria.CriteriaQuery; -import javax.persistence.criteria.Join; -import javax.persistence.criteria.Predicate; -import javax.persistence.criteria.Root; +import javax.persistence.criteria.*; import javax.validation.constraints.NotNull; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import static org.apache.commons.lang3.StringUtils.defaultString; -import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.commons.lang3.StringUtils.isNoneBlank; -import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.commons.lang3.StringUtils.*; public abstract class BaseTermReadSvcImpl implements ITermReadSvc, ApplicationContextAware { public static final int DEFAULT_FETCH_SIZE = 250; @@ -1325,7 +1309,7 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc, ApplicationCo ScheduledJobDefinition vsJobDefinition = new ScheduledJobDefinition(); vsJobDefinition.setId(BaseTermReadSvcImpl.class.getName() + "_preExpandValueSets"); vsJobDefinition.setJobClass(PreExpandValueSetsJob.class); - mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_MINUTE, true, vsJobDefinition); + mySchedulerService.scheduleFixedDelayClustered(10 * DateUtils.MILLIS_PER_MINUTE, vsJobDefinition); } @Override diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java index 247b3acfe9c..f151effa888 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java @@ -266,7 +266,7 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc { ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); jobDefinition.setId(BaseTermReadSvcImpl.class.getName() + "_saveDeferred"); jobDefinition.setJobClass(SaveDeferredJob.class); - mySchedulerService.scheduleFixedDelay(SCHEDULE_INTERVAL_MILLIS, false, jobDefinition); + mySchedulerService.scheduleFixedDelayLocal(SCHEDULE_INTERVAL_MILLIS, jobDefinition); } @VisibleForTesting diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java index cbc7b9b4e92..961840cd13f 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java @@ -156,7 +156,7 @@ public class TermReindexingSvcImpl implements ITermReindexingSvc { ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); jobDefinition.setId(TermReindexingSvcImpl.class.getName() + "_reindex"); jobDefinition.setJobClass(SaveDeferredJob.class); - mySchedulerService.scheduleFixedDelay(SCHEDULE_INTERVAL_MILLIS, false, jobDefinition); + mySchedulerService.scheduleFixedDelayLocal(SCHEDULE_INTERVAL_MILLIS, jobDefinition); } public static class SaveDeferredJob extends FireAtIntervalJob { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/ResourceCountCache.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/ResourceCountCache.java index 843736034fd..e6a3844646d 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/ResourceCountCache.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/ResourceCountCache.java @@ -96,7 +96,7 @@ public class ResourceCountCache { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); jobDetail.setId(ResourceCountCache.class.getName()); jobDetail.setJobClass(ResourceCountCache.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_MINUTE, false, jobDetail); + mySchedulerService.scheduleFixedDelayLocal(10 * DateUtils.MILLIS_PER_MINUTE, jobDetail); } public static class SubmitJob implements Job { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImplTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImplTest.java index 8409ea0c049..0c741d693c8 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImplTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImplTest.java @@ -10,7 +10,6 @@ import org.junit.runner.RunWith; import org.quartz.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.BeanUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -18,7 +17,6 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.data.util.ProxyUtils; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.util.AopTestUtils; @@ -50,7 +48,7 @@ public class SchedulerServiceImplTest { .setId(CountingJob.class.getName()) .setJobClass(CountingJob.class); - mySvc.scheduleFixedDelay(100, false, def); + mySvc.scheduleFixedDelayLocal(100, def); sleepAtLeast(1000); @@ -69,10 +67,10 @@ public class SchedulerServiceImplTest { SchedulerServiceImpl svc = AopTestUtils.getTargetObject(mySvc); svc.stop(); + svc.create(); svc.start(); - svc.contextStarted(null); - mySvc.scheduleFixedDelay(100, false, def); + mySvc.scheduleFixedDelayLocal(100, def); sleepAtLeast(1000); @@ -90,7 +88,7 @@ public class SchedulerServiceImplTest { .setJobClass(CountingJob.class); ourTaskDelay = 500; - mySvc.scheduleFixedDelay(100, false, def); + mySvc.scheduleFixedDelayLocal(100, def); sleepAtLeast(1000); @@ -108,7 +106,7 @@ public class SchedulerServiceImplTest { .setJobClass(CountingIntervalJob.class); ourTaskDelay = 500; - mySvc.scheduleFixedDelay(100, false, def); + mySvc.scheduleFixedDelayLocal(100, def); sleepAtLeast(2000); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionTriggeringDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionTriggeringDstu3Test.java index 1cfc1e5896f..6d1d9169235 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionTriggeringDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionTriggeringDstu3Test.java @@ -3,11 +3,11 @@ package ca.uhn.fhir.jpa.subscription.resthook; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; +import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider; import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test; import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil; import ca.uhn.fhir.jpa.subscription.SubscriptionTriggeringSvcImpl; -import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.rest.annotation.Create; import ca.uhn.fhir.rest.annotation.ResourceParam; import ca.uhn.fhir.rest.annotation.Update; @@ -16,6 +16,7 @@ import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.RestfulServer; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; +import ca.uhn.fhir.test.utilities.JettyUtil; import com.google.common.collect.Lists; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -34,8 +35,6 @@ import java.util.List; import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.*; -import ca.uhn.fhir.test.utilities.JettyUtil; - /** * Test the rest-hook subscriptions */ @@ -101,7 +100,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourUpdatedPatients.clear(); ourContentTypes.clear(); - mySchedulerService.logStatus(); + mySchedulerService.logStatusForUnitTest(); } private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException { diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISchedulerService.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISchedulerService.java index 599b12a9ead..a5446cbb67b 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISchedulerService.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISchedulerService.java @@ -28,14 +28,21 @@ public interface ISchedulerService { @VisibleForTesting void purgeAllScheduledJobsForUnitTest() throws SchedulerException; - void logStatus(); + void logStatusForUnitTest(); /** + * Only one instance of this task will fire across the whole cluster (when running in a clustered environment). * @param theIntervalMillis How many milliseconds between passes should this job run - * @param theClusteredTask If true, only one instance of this task will fire across the whole cluster (when running in a clustered environment). If false, or if not running in a clustered environment, this task will execute locally (and should execute on all nodes of the cluster) * @param theJobDefinition The Job to fire */ - void scheduleFixedDelay(long theIntervalMillis, boolean theClusteredTask, ScheduledJobDefinition theJobDefinition); + void scheduleFixedDelayLocal(long theIntervalMillis, ScheduledJobDefinition theJobDefinition); + + /** + * This task will execute locally (and should execute on all nodes of the cluster if there is a cluster) + * @param theIntervalMillis How many milliseconds between passes should this job run + * @param theJobDefinition The Job to fire + */ + void scheduleFixedDelayClustered(long theIntervalMillis, ScheduledJobDefinition theJobDefinition); boolean isStopping(); } diff --git a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImpl.java b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImpl.java index d9875583e24..a29253ac7f4 100644 --- a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImpl.java +++ b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImpl.java @@ -47,11 +47,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; import org.hl7.fhir.dstu3.model.Extension; import org.hl7.fhir.dstu3.model.SearchParameter; -import org.hl7.fhir.instance.model.api.IBaseExtension; -import org.hl7.fhir.instance.model.api.IBaseHasExtensions; -import org.hl7.fhir.instance.model.api.IBaseResource; -import org.hl7.fhir.instance.model.api.IIdType; -import org.hl7.fhir.instance.model.api.IPrimitiveType; +import org.hl7.fhir.instance.model.api.*; import org.hl7.fhir.r4.model.Reference; import org.quartz.Job; import org.quartz.JobExecutionContext; @@ -60,14 +56,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import javax.annotation.PostConstruct; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import static org.apache.commons.lang3.StringUtils.isBlank; @@ -739,7 +728,7 @@ public class SearchParamRegistryImpl implements ISearchParamRegistry { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); jobDetail.setId(SearchParamRegistryImpl.class.getName()); jobDetail.setJobClass(SubmitJob.class); - mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_SECOND, false, jobDetail); + mySchedulerService.scheduleFixedDelayLocal(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); } @Override diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java index 1f5e4ae998b..5ff6e90b186 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java @@ -94,7 +94,7 @@ public class SubscriptionLoader { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); jobDetail.setId(SubscriptionLoader.class.getName()); jobDetail.setJobClass(SubscriptionLoader.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(REFRESH_INTERVAL, false, jobDetail); + mySchedulerService.scheduleFixedDelayLocal(REFRESH_INTERVAL, jobDetail); } @VisibleForTesting diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/interceptor/AnalyticsInterceptor.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/interceptor/AnalyticsInterceptor.java index b21b6b27050..93958633bcf 100644 --- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/interceptor/AnalyticsInterceptor.java +++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/interceptor/AnalyticsInterceptor.java @@ -70,7 +70,7 @@ public class AnalyticsInterceptor extends InterceptorAdapter { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); jobDetail.setId(getClass().getName()); jobDetail.setJobClass(SubmitJob.class); - mySchedulerService.scheduleFixedDelay(5000, false, jobDetail); + mySchedulerService.scheduleFixedDelayLocal(5000, jobDetail); } @PreDestroy