remove boolean from scheduling API

This commit is contained in:
Ken Stevens 2019-11-19 10:13:22 -05:00
parent 3173af31ae
commit 8fca468e1f
19 changed files with 418 additions and 394 deletions

View File

@ -0,0 +1,10 @@
package ca.uhn.fhir.model.api;
public interface ISmartLifecyclePhase {
// POST_CONSTRUCT is here as a marker for where post-construct fits into the smart lifecycle. Beans with negative phases
// will be started before @PostConstruct are called
int POST_CONSTRUCT = 0;
// We want to start scheduled tasks fairly late in the startup process
int SCHEDULER_1000 = 1000;
}

View File

@ -25,13 +25,13 @@ import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.model.cross.ResourcePersistentId;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.cross.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob; import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
@ -319,7 +319,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(BulkDataExportSvcImpl.class.getName()); jobDetail.setId(BulkDataExportSvcImpl.class.getName());
jobDetail.setJobClass(BulkDataExportSvcImpl.SubmitJob.class); jobDetail.setJobClass(BulkDataExportSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(REFRESH_INTERVAL, true, jobDetail); mySchedulerService.scheduleFixedDelayClustered(REFRESH_INTERVAL, jobDetail);
} }
@Transactional @Transactional

View File

@ -0,0 +1,297 @@
package ca.uhn.fhir.jpa.sched;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.spi.JobFactory;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
class NullScheduler implements Scheduler {
@Override
public String getSchedulerName() {
return null;
}
@Override
public String getSchedulerInstanceId() {
return null;
}
@Override
public SchedulerContext getContext() {
return null;
}
@Override
public void start() {
}
@Override
public void startDelayed(int seconds) {
}
@Override
public boolean isStarted() {
return false;
}
@Override
public void standby() {
}
@Override
public boolean isInStandbyMode() {
return false;
}
@Override
public void shutdown() {
}
@Override
public void shutdown(boolean waitForJobsToComplete) {
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public SchedulerMetaData getMetaData() {
return null;
}
@Override
public List<JobExecutionContext> getCurrentlyExecutingJobs() {
return null;
}
@Override
public void setJobFactory(JobFactory factory) {
}
@Override
public ListenerManager getListenerManager() {
return null;
}
@Override
public Date scheduleJob(JobDetail jobDetail, Trigger trigger) {
return null;
}
@Override
public Date scheduleJob(Trigger trigger) {
return null;
}
@Override
public void scheduleJobs(Map<JobDetail, Set<? extends Trigger>> triggersAndJobs, boolean replace) {
}
@Override
public void scheduleJob(JobDetail jobDetail, Set<? extends Trigger> triggersForJob, boolean replace) {
}
@Override
public boolean unscheduleJob(TriggerKey triggerKey) {
return false;
}
@Override
public boolean unscheduleJobs(List<TriggerKey> triggerKeys) {
return false;
}
@Override
public Date rescheduleJob(TriggerKey triggerKey, Trigger newTrigger) {
return null;
}
@Override
public void addJob(JobDetail jobDetail, boolean replace) {
}
@Override
public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) {
}
@Override
public boolean deleteJob(JobKey jobKey) {
return false;
}
@Override
public boolean deleteJobs(List<JobKey> jobKeys) {
return false;
}
@Override
public void triggerJob(JobKey jobKey) {
}
@Override
public void triggerJob(JobKey jobKey, JobDataMap data) {
}
@Override
public void pauseJob(JobKey jobKey) {
}
@Override
public void pauseJobs(GroupMatcher<JobKey> matcher) {
}
@Override
public void pauseTrigger(TriggerKey triggerKey) {
}
@Override
public void pauseTriggers(GroupMatcher<TriggerKey> matcher) {
}
@Override
public void resumeJob(JobKey jobKey) {
}
@Override
public void resumeJobs(GroupMatcher<JobKey> matcher) {
}
@Override
public void resumeTrigger(TriggerKey triggerKey) {
}
@Override
public void resumeTriggers(GroupMatcher<TriggerKey> matcher) {
}
@Override
public void pauseAll() {
}
@Override
public void resumeAll() {
}
@Override
public List<String> getJobGroupNames() {
return null;
}
@Override
public Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher) {
return null;
}
@Override
public List<? extends Trigger> getTriggersOfJob(JobKey jobKey) {
return null;
}
@Override
public List<String> getTriggerGroupNames() {
return null;
}
@Override
public Set<TriggerKey> getTriggerKeys(GroupMatcher<TriggerKey> matcher) {
return null;
}
@Override
public Set<String> getPausedTriggerGroups() {
return null;
}
@Override
public JobDetail getJobDetail(JobKey jobKey) {
return null;
}
@Override
public Trigger getTrigger(TriggerKey triggerKey) {
return null;
}
@Override
public Trigger.TriggerState getTriggerState(TriggerKey triggerKey) {
return null;
}
@Override
public void resetTriggerFromErrorState(TriggerKey triggerKey) {
}
@Override
public void addCalendar(String calName, Calendar calendar, boolean replace, boolean updateTriggers) {
}
@Override
public boolean deleteCalendar(String calName) {
return false;
}
@Override
public Calendar getCalendar(String calName) {
return null;
}
@Override
public List<String> getCalendarNames() {
return null;
}
@Override
public boolean interrupt(JobKey jobKey) throws UnableToInterruptJobException {
return false;
}
@Override
public boolean interrupt(String fireInstanceId) throws UnableToInterruptJobException {
return false;
}
@Override
public boolean checkExists(JobKey jobKey) {
return false;
}
@Override
public boolean checkExists(TriggerKey triggerKey) {
return false;
}
@Override
public void clear() {
}
}

View File

@ -20,28 +20,29 @@ package ca.uhn.fhir.jpa.sched;
* #L% * #L%
*/ */
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.model.api.ISmartLifecyclePhase;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.quartz.Calendar;
import org.quartz.*; import org.quartz.*;
import org.quartz.impl.JobDetailImpl; import org.quartz.impl.JobDetailImpl;
import org.quartz.impl.StdSchedulerFactory; import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher; import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.spi.JobFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener; import org.springframework.context.SmartLifecycle;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import java.util.Properties;
import java.util.*; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME; import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME;
@ -63,22 +64,24 @@ import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME;
* </li> * </li>
* </ul> * </ul>
*/ */
public class SchedulerServiceImpl implements ISchedulerService { public class SchedulerServiceImpl implements ISchedulerService, SmartLifecycle {
public static final String SCHEDULING_DISABLED = "scheduling_disabled"; public static final String SCHEDULING_DISABLED = "scheduling_disabled";
public static final String SCHEDULING_DISABLED_EQUALS_TRUE = SCHEDULING_DISABLED + "=true"; public static final String SCHEDULING_DISABLED_EQUALS_TRUE = SCHEDULING_DISABLED + "=true";
private static final Logger ourLog = LoggerFactory.getLogger(SchedulerServiceImpl.class); private static final Logger ourLog = LoggerFactory.getLogger(SchedulerServiceImpl.class);
private static int ourNextSchedulerId = 0; private static AtomicInteger ourNextSchedulerId = new AtomicInteger();
private Scheduler myLocalScheduler; private Scheduler myLocalScheduler;
private Scheduler myClusteredScheduler; private Scheduler myClusteredScheduler;
private String myThreadNamePrefix; private String myThreadNamePrefix;
private boolean myLocalSchedulingEnabled; private boolean myLocalSchedulingEnabled;
private boolean myClusteredSchedulingEnabled; private boolean myClusteredSchedulingEnabled;
@Autowired
private AutowiringSpringBeanJobFactory mySpringBeanJobFactory;
private AtomicBoolean myStopping = new AtomicBoolean(false); private AtomicBoolean myStopping = new AtomicBoolean(false);
@Autowired @Autowired
private AutowiringSpringBeanJobFactory mySpringBeanJobFactory;
@Autowired
private Environment myEnvironment; private Environment myEnvironment;
@Autowired
private ApplicationContext myApplicationContext;
/** /**
* Constructor * Constructor
@ -114,38 +117,18 @@ public class SchedulerServiceImpl implements ISchedulerService {
} }
@PostConstruct @PostConstruct
public void start() throws SchedulerException { public void create() throws SchedulerException {
myLocalScheduler = createLocalScheduler(); myLocalScheduler = createLocalScheduler();
myClusteredScheduler = createClusteredScheduler(); myClusteredScheduler = createClusteredScheduler();
myStopping.set(false); myStopping.set(false);
} }
/**
* We defer startup of executing started tasks until we're sure we're ready for it
* and the startup is completely done
*/
@EventListener
public void contextStarted(ContextRefreshedEvent theEvent) throws SchedulerException {
try {
ourLog.info("Starting task schedulers for context {}", theEvent != null ? theEvent.getApplicationContext().getId() : "null");
if (myLocalScheduler != null) {
myLocalScheduler.start();
}
if (myClusteredScheduler != null) {
myClusteredScheduler.start();
}
} catch (Exception e) {
ourLog.error("Failed to start context", e);
throw new SchedulerException(e);
}
}
private Scheduler createLocalScheduler() throws SchedulerException { private Scheduler createLocalScheduler() throws SchedulerException {
if (!isLocalSchedulingEnabled() || isSchedulingDisabledForUnitTests()) { if (!isLocalSchedulingEnabled() || isSchedulingDisabledForUnitTests()) {
return new NullScheduler(); return new NullScheduler();
} }
Properties localProperties = new Properties(); Properties localProperties = new Properties();
localProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "local-" + ourNextSchedulerId++); localProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "local-" + ourNextSchedulerId.getAndIncrement());
quartzPropertiesCommon(localProperties); quartzPropertiesCommon(localProperties);
quartzPropertiesLocal(localProperties); quartzPropertiesLocal(localProperties);
StdSchedulerFactory factory = new StdSchedulerFactory(); StdSchedulerFactory factory = new StdSchedulerFactory();
@ -161,7 +144,7 @@ public class SchedulerServiceImpl implements ISchedulerService {
return new NullScheduler(); return new NullScheduler();
} }
Properties clusteredProperties = new Properties(); Properties clusteredProperties = new Properties();
clusteredProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "clustered-" + ourNextSchedulerId++); clusteredProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "clustered-" + ourNextSchedulerId.getAndIncrement());
quartzPropertiesCommon(clusteredProperties); quartzPropertiesCommon(clusteredProperties);
quartzPropertiesClustered(clusteredProperties); quartzPropertiesClustered(clusteredProperties);
StdSchedulerFactory factory = new StdSchedulerFactory(); StdSchedulerFactory factory = new StdSchedulerFactory();
@ -176,13 +159,54 @@ public class SchedulerServiceImpl implements ISchedulerService {
theScheduler.setJobFactory(mySpringBeanJobFactory); theScheduler.setJobFactory(mySpringBeanJobFactory);
} }
@PreDestroy /**
public void stop() throws SchedulerException { * We defer startup of executing started tasks until we're sure we're ready for it
* and the startup is completely done
*/
@Override
public int getPhase() {
return ISmartLifecyclePhase.SCHEDULER_1000;
}
@Override
public void start() {
try {
ourLog.info("Starting task schedulers for context {}", myApplicationContext.getId());
if (myLocalScheduler != null) {
myLocalScheduler.start();
}
if (myClusteredScheduler != null) {
myClusteredScheduler.start();
}
} catch (Exception e) {
ourLog.error("Failed to start scheduler", e);
throw new ConfigurationException("Failed to start scheduler", e);
}
}
@Override
public void stop() {
ourLog.info("Shutting down task scheduler..."); ourLog.info("Shutting down task scheduler...");
myStopping.set(true); myStopping.set(true);
try {
myLocalScheduler.shutdown(true); myLocalScheduler.shutdown(true);
myClusteredScheduler.shutdown(true); myClusteredScheduler.shutdown(true);
} catch (SchedulerException e) {
ourLog.error("Failed to shut down scheduler");
throw new ConfigurationException("Failed to shut down scheduler", e);
}
}
@Override
public boolean isRunning() {
try {
return !myStopping.get() && myLocalScheduler.isStarted() && myClusteredScheduler.isStarted();
} catch (SchedulerException e) {
ourLog.error("Failed to determine scheduler status", e);
return false;
}
} }
@Override @Override
@ -192,7 +216,7 @@ public class SchedulerServiceImpl implements ISchedulerService {
} }
@Override @Override
public void logStatus() { public void logStatusForUnitTest() {
try { try {
Set<JobKey> keys = myLocalScheduler.getJobKeys(GroupMatcher.anyGroup()); Set<JobKey> keys = myLocalScheduler.getJobKeys(GroupMatcher.anyGroup());
String keysString = keys.stream().map(t -> t.getName()).collect(Collectors.joining(", ")); String keysString = keys.stream().map(t -> t.getName()).collect(Collectors.joining(", "));
@ -207,7 +231,16 @@ public class SchedulerServiceImpl implements ISchedulerService {
} }
@Override @Override
public void scheduleFixedDelay(long theIntervalMillis, boolean theClusteredTask, ScheduledJobDefinition theJobDefinition) { public void scheduleFixedDelayLocal(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) {
scheduleFixedDelay(theIntervalMillis, myLocalScheduler, theJobDefinition);
}
@Override
public void scheduleFixedDelayClustered(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) {
scheduleFixedDelay(theIntervalMillis, myClusteredScheduler, theJobDefinition);
}
private void scheduleFixedDelay(long theIntervalMillis, Scheduler theScheduler, ScheduledJobDefinition theJobDefinition) {
Validate.isTrue(theIntervalMillis >= 100); Validate.isTrue(theIntervalMillis >= 100);
Validate.notNull(theJobDefinition); Validate.notNull(theJobDefinition);
@ -235,13 +268,7 @@ public class SchedulerServiceImpl implements ISchedulerService {
Set<? extends Trigger> triggers = Sets.newHashSet(trigger); Set<? extends Trigger> triggers = Sets.newHashSet(trigger);
try { try {
Scheduler scheduler; theScheduler.scheduleJob(jobDetail, triggers, true);
if (theClusteredTask) {
scheduler = myClusteredScheduler;
} else {
scheduler = myLocalScheduler;
}
scheduler.scheduleJob(jobDetail, triggers, true);
} catch (SchedulerException e) { } catch (SchedulerException e) {
ourLog.error("Failed to schedule job", e); ourLog.error("Failed to schedule job", e);
throw new InternalErrorException(e); throw new InternalErrorException(e);
@ -288,292 +315,5 @@ public class SchedulerServiceImpl implements ISchedulerService {
} }
} }
private static class NullScheduler implements Scheduler {
@Override
public String getSchedulerName() {
return null;
}
@Override
public String getSchedulerInstanceId() {
return null;
}
@Override
public SchedulerContext getContext() {
return null;
}
@Override
public void start() {
}
@Override
public void startDelayed(int seconds) {
}
@Override
public boolean isStarted() {
return false;
}
@Override
public void standby() {
}
@Override
public boolean isInStandbyMode() {
return false;
}
@Override
public void shutdown() {
}
@Override
public void shutdown(boolean waitForJobsToComplete) {
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public SchedulerMetaData getMetaData() {
return null;
}
@Override
public List<JobExecutionContext> getCurrentlyExecutingJobs() {
return null;
}
@Override
public void setJobFactory(JobFactory factory) {
}
@Override
public ListenerManager getListenerManager() {
return null;
}
@Override
public Date scheduleJob(JobDetail jobDetail, Trigger trigger) {
return null;
}
@Override
public Date scheduleJob(Trigger trigger) {
return null;
}
@Override
public void scheduleJobs(Map<JobDetail, Set<? extends Trigger>> triggersAndJobs, boolean replace) {
}
@Override
public void scheduleJob(JobDetail jobDetail, Set<? extends Trigger> triggersForJob, boolean replace) {
}
@Override
public boolean unscheduleJob(TriggerKey triggerKey) {
return false;
}
@Override
public boolean unscheduleJobs(List<TriggerKey> triggerKeys) {
return false;
}
@Override
public Date rescheduleJob(TriggerKey triggerKey, Trigger newTrigger) {
return null;
}
@Override
public void addJob(JobDetail jobDetail, boolean replace) {
}
@Override
public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) {
}
@Override
public boolean deleteJob(JobKey jobKey) {
return false;
}
@Override
public boolean deleteJobs(List<JobKey> jobKeys) {
return false;
}
@Override
public void triggerJob(JobKey jobKey) {
}
@Override
public void triggerJob(JobKey jobKey, JobDataMap data) {
}
@Override
public void pauseJob(JobKey jobKey) {
}
@Override
public void pauseJobs(GroupMatcher<JobKey> matcher) {
}
@Override
public void pauseTrigger(TriggerKey triggerKey) {
}
@Override
public void pauseTriggers(GroupMatcher<TriggerKey> matcher) {
}
@Override
public void resumeJob(JobKey jobKey) {
}
@Override
public void resumeJobs(GroupMatcher<JobKey> matcher) {
}
@Override
public void resumeTrigger(TriggerKey triggerKey) {
}
@Override
public void resumeTriggers(GroupMatcher<TriggerKey> matcher) {
}
@Override
public void pauseAll() {
}
@Override
public void resumeAll() {
}
@Override
public List<String> getJobGroupNames() {
return null;
}
@Override
public Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher) {
return null;
}
@Override
public List<? extends Trigger> getTriggersOfJob(JobKey jobKey) {
return null;
}
@Override
public List<String> getTriggerGroupNames() {
return null;
}
@Override
public Set<TriggerKey> getTriggerKeys(GroupMatcher<TriggerKey> matcher) {
return null;
}
@Override
public Set<String> getPausedTriggerGroups() {
return null;
}
@Override
public JobDetail getJobDetail(JobKey jobKey) {
return null;
}
@Override
public Trigger getTrigger(TriggerKey triggerKey) {
return null;
}
@Override
public Trigger.TriggerState getTriggerState(TriggerKey triggerKey) {
return null;
}
@Override
public void resetTriggerFromErrorState(TriggerKey triggerKey) {
}
@Override
public void addCalendar(String calName, Calendar calendar, boolean replace, boolean updateTriggers) {
}
@Override
public boolean deleteCalendar(String calName) {
return false;
}
@Override
public Calendar getCalendar(String calName) {
return null;
}
@Override
public List<String> getCalendarNames() {
return null;
}
@Override
public boolean interrupt(JobKey jobKey) throws UnableToInterruptJobException {
return false;
}
@Override
public boolean interrupt(String fireInstanceId) throws UnableToInterruptJobException {
return false;
}
@Override
public boolean checkExists(JobKey jobKey) {
return false;
}
@Override
public boolean checkExists(TriggerKey triggerKey) {
return false;
}
@Override
public void clear() {
}
}
} }

View File

@ -62,7 +62,7 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(StaleSearchDeletingSvcImpl.class.getName()); jobDetail.setId(StaleSearchDeletingSvcImpl.class.getName());
jobDetail.setJobClass(StaleSearchDeletingSvcImpl.SubmitJob.class); jobDetail.setJobClass(StaleSearchDeletingSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(DEFAULT_CUTOFF_SLACK, true, jobDetail); mySchedulerService.scheduleFixedDelayClustered(DEFAULT_CUTOFF_SLACK, jobDetail);
} }
@Transactional(propagation = Propagation.NEVER) @Transactional(propagation = Propagation.NEVER)

View File

@ -55,7 +55,7 @@ public abstract class BaseSearchCacheSvcImpl implements ISearchCacheSvc {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(BaseSearchCacheSvcImpl.class.getName()); jobDetail.setId(BaseSearchCacheSvcImpl.class.getName());
jobDetail.setJobClass(BaseSearchCacheSvcImpl.SubmitJob.class); jobDetail.setJobClass(BaseSearchCacheSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_SECOND, false, jobDetail); mySchedulerService.scheduleFixedDelayLocal(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
} }
@Override @Override

View File

@ -192,7 +192,7 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(ResourceReindexingSvcImpl.class.getName()); jobDetail.setId(ResourceReindexingSvcImpl.class.getName());
jobDetail.setJobClass(ResourceReindexingSvcImpl.SubmitJob.class); jobDetail.setJobClass(ResourceReindexingSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_SECOND, true, jobDetail); mySchedulerService.scheduleFixedDelayClustered(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
} }
@VisibleForTesting @VisibleForTesting

View File

@ -88,7 +88,7 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(CacheWarmingSvcImpl.class.getName()); jobDetail.setId(CacheWarmingSvcImpl.class.getName());
jobDetail.setJobClass(CacheWarmingSvcImpl.SubmitJob.class); jobDetail.setJobClass(CacheWarmingSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(SCHEDULED_JOB_INTERVAL, true, jobDetail); mySchedulerService.scheduleFixedDelayClustered(SCHEDULED_JOB_INTERVAL, jobDetail);
} }
private void refreshNow(WarmCacheEntry theCacheEntry) { private void refreshNow(WarmCacheEntry theCacheEntry) {

View File

@ -164,7 +164,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(SubscriptionTriggeringSvcImpl.class.getName()); jobDetail.setId(SubscriptionTriggeringSvcImpl.class.getName());
jobDetail.setJobClass(SubscriptionTriggeringSvcImpl.SubmitJob.class); jobDetail.setJobClass(SubscriptionTriggeringSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(SCHEDULE_DELAY, false, jobDetail); mySchedulerService.scheduleFixedDelayLocal(SCHEDULE_DELAY, jobDetail);
} }
@Override @Override

View File

@ -23,13 +23,8 @@ package ca.uhn.fhir.jpa.term;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.context.support.IContextValidationSupport; import ca.uhn.fhir.context.support.IContextValidationSupport;
import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.dao.*;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IDao;
import ca.uhn.fhir.jpa.dao.IFhirResourceDaoCodeSystem;
import ca.uhn.fhir.jpa.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.dao.IFhirResourceDaoValueSet.ValidateCodeResult; import ca.uhn.fhir.jpa.dao.IFhirResourceDaoValueSet.ValidateCodeResult;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
import ca.uhn.fhir.jpa.dao.data.*; import ca.uhn.fhir.jpa.dao.data.*;
import ca.uhn.fhir.jpa.entity.*; import ca.uhn.fhir.jpa.entity.*;
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink.RelationshipTypeEnum; import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink.RelationshipTypeEnum;
@ -60,11 +55,7 @@ import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.queries.TermsQuery; import org.apache.lucene.queries.TermsQuery;
import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.*;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.MultiPhraseQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.RegexpQuery;
import org.hibernate.ScrollMode; import org.hibernate.ScrollMode;
import org.hibernate.ScrollableResults; import org.hibernate.ScrollableResults;
import org.hibernate.search.jpa.FullTextEntityManager; import org.hibernate.search.jpa.FullTextEntityManager;
@ -101,21 +92,14 @@ import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext; import javax.persistence.PersistenceContext;
import javax.persistence.PersistenceContextType; import javax.persistence.PersistenceContextType;
import javax.persistence.TypedQuery; import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaBuilder; import javax.persistence.criteria.*;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Join;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.defaultString; import static org.apache.commons.lang3.StringUtils.*;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNoneBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public abstract class BaseTermReadSvcImpl implements ITermReadSvc, ApplicationContextAware { public abstract class BaseTermReadSvcImpl implements ITermReadSvc, ApplicationContextAware {
public static final int DEFAULT_FETCH_SIZE = 250; public static final int DEFAULT_FETCH_SIZE = 250;
@ -1325,7 +1309,7 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc, ApplicationCo
ScheduledJobDefinition vsJobDefinition = new ScheduledJobDefinition(); ScheduledJobDefinition vsJobDefinition = new ScheduledJobDefinition();
vsJobDefinition.setId(BaseTermReadSvcImpl.class.getName() + "_preExpandValueSets"); vsJobDefinition.setId(BaseTermReadSvcImpl.class.getName() + "_preExpandValueSets");
vsJobDefinition.setJobClass(PreExpandValueSetsJob.class); vsJobDefinition.setJobClass(PreExpandValueSetsJob.class);
mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_MINUTE, true, vsJobDefinition); mySchedulerService.scheduleFixedDelayClustered(10 * DateUtils.MILLIS_PER_MINUTE, vsJobDefinition);
} }
@Override @Override

View File

@ -266,7 +266,7 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
jobDefinition.setId(BaseTermReadSvcImpl.class.getName() + "_saveDeferred"); jobDefinition.setId(BaseTermReadSvcImpl.class.getName() + "_saveDeferred");
jobDefinition.setJobClass(SaveDeferredJob.class); jobDefinition.setJobClass(SaveDeferredJob.class);
mySchedulerService.scheduleFixedDelay(SCHEDULE_INTERVAL_MILLIS, false, jobDefinition); mySchedulerService.scheduleFixedDelayLocal(SCHEDULE_INTERVAL_MILLIS, jobDefinition);
} }
@VisibleForTesting @VisibleForTesting

View File

@ -156,7 +156,7 @@ public class TermReindexingSvcImpl implements ITermReindexingSvc {
ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
jobDefinition.setId(TermReindexingSvcImpl.class.getName() + "_reindex"); jobDefinition.setId(TermReindexingSvcImpl.class.getName() + "_reindex");
jobDefinition.setJobClass(SaveDeferredJob.class); jobDefinition.setJobClass(SaveDeferredJob.class);
mySchedulerService.scheduleFixedDelay(SCHEDULE_INTERVAL_MILLIS, false, jobDefinition); mySchedulerService.scheduleFixedDelayLocal(SCHEDULE_INTERVAL_MILLIS, jobDefinition);
} }
public static class SaveDeferredJob extends FireAtIntervalJob { public static class SaveDeferredJob extends FireAtIntervalJob {

View File

@ -96,7 +96,7 @@ public class ResourceCountCache {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(ResourceCountCache.class.getName()); jobDetail.setId(ResourceCountCache.class.getName());
jobDetail.setJobClass(ResourceCountCache.SubmitJob.class); jobDetail.setJobClass(ResourceCountCache.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_MINUTE, false, jobDetail); mySchedulerService.scheduleFixedDelayLocal(10 * DateUtils.MILLIS_PER_MINUTE, jobDetail);
} }
public static class SubmitJob implements Job { public static class SubmitJob implements Job {

View File

@ -10,7 +10,6 @@ import org.junit.runner.RunWith;
import org.quartz.*; import org.quartz.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
@ -18,7 +17,6 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.data.util.ProxyUtils;
import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.util.AopTestUtils; import org.springframework.test.util.AopTestUtils;
@ -50,7 +48,7 @@ public class SchedulerServiceImplTest {
.setId(CountingJob.class.getName()) .setId(CountingJob.class.getName())
.setJobClass(CountingJob.class); .setJobClass(CountingJob.class);
mySvc.scheduleFixedDelay(100, false, def); mySvc.scheduleFixedDelayLocal(100, def);
sleepAtLeast(1000); sleepAtLeast(1000);
@ -69,10 +67,10 @@ public class SchedulerServiceImplTest {
SchedulerServiceImpl svc = AopTestUtils.getTargetObject(mySvc); SchedulerServiceImpl svc = AopTestUtils.getTargetObject(mySvc);
svc.stop(); svc.stop();
svc.create();
svc.start(); svc.start();
svc.contextStarted(null);
mySvc.scheduleFixedDelay(100, false, def); mySvc.scheduleFixedDelayLocal(100, def);
sleepAtLeast(1000); sleepAtLeast(1000);
@ -90,7 +88,7 @@ public class SchedulerServiceImplTest {
.setJobClass(CountingJob.class); .setJobClass(CountingJob.class);
ourTaskDelay = 500; ourTaskDelay = 500;
mySvc.scheduleFixedDelay(100, false, def); mySvc.scheduleFixedDelayLocal(100, def);
sleepAtLeast(1000); sleepAtLeast(1000);
@ -108,7 +106,7 @@ public class SchedulerServiceImplTest {
.setJobClass(CountingIntervalJob.class); .setJobClass(CountingIntervalJob.class);
ourTaskDelay = 500; ourTaskDelay = 500;
mySvc.scheduleFixedDelay(100, false, def); mySvc.scheduleFixedDelayLocal(100, def);
sleepAtLeast(2000); sleepAtLeast(2000);

View File

@ -3,11 +3,11 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider; import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider;
import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test; import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test;
import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil; import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.subscription.SubscriptionTriggeringSvcImpl; import ca.uhn.fhir.jpa.subscription.SubscriptionTriggeringSvcImpl;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.annotation.Create; import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam; import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update; import ca.uhn.fhir.rest.annotation.Update;
@ -16,6 +16,7 @@ import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.RestfulServer; import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.test.utilities.JettyUtil;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
@ -34,8 +35,6 @@ import java.util.List;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import ca.uhn.fhir.test.utilities.JettyUtil;
/** /**
* Test the rest-hook subscriptions * Test the rest-hook subscriptions
*/ */
@ -101,7 +100,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
ourUpdatedPatients.clear(); ourUpdatedPatients.clear();
ourContentTypes.clear(); ourContentTypes.clear();
mySchedulerService.logStatus(); mySchedulerService.logStatusForUnitTest();
} }
private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException { private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {

View File

@ -28,14 +28,21 @@ public interface ISchedulerService {
@VisibleForTesting @VisibleForTesting
void purgeAllScheduledJobsForUnitTest() throws SchedulerException; void purgeAllScheduledJobsForUnitTest() throws SchedulerException;
void logStatus(); void logStatusForUnitTest();
/** /**
* Only one instance of this task will fire across the whole cluster (when running in a clustered environment).
* @param theIntervalMillis How many milliseconds between passes should this job run * @param theIntervalMillis How many milliseconds between passes should this job run
* @param theClusteredTask If <code>true</code>, only one instance of this task will fire across the whole cluster (when running in a clustered environment). If <code>false</code>, or if not running in a clustered environment, this task will execute locally (and should execute on all nodes of the cluster)
* @param theJobDefinition The Job to fire * @param theJobDefinition The Job to fire
*/ */
void scheduleFixedDelay(long theIntervalMillis, boolean theClusteredTask, ScheduledJobDefinition theJobDefinition); void scheduleFixedDelayLocal(long theIntervalMillis, ScheduledJobDefinition theJobDefinition);
/**
* This task will execute locally (and should execute on all nodes of the cluster if there is a cluster)
* @param theIntervalMillis How many milliseconds between passes should this job run
* @param theJobDefinition The Job to fire
*/
void scheduleFixedDelayClustered(long theIntervalMillis, ScheduledJobDefinition theJobDefinition);
boolean isStopping(); boolean isStopping();
} }

View File

@ -47,11 +47,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.dstu3.model.Extension; import org.hl7.fhir.dstu3.model.Extension;
import org.hl7.fhir.dstu3.model.SearchParameter; import org.hl7.fhir.dstu3.model.SearchParameter;
import org.hl7.fhir.instance.model.api.IBaseExtension; import org.hl7.fhir.instance.model.api.*;
import org.hl7.fhir.instance.model.api.IBaseHasExtensions;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Reference; import org.hl7.fhir.r4.model.Reference;
import org.quartz.Job; import org.quartz.Job;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
@ -60,14 +56,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.ArrayList; import java.util.*;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isBlank;
@ -739,7 +728,7 @@ public class SearchParamRegistryImpl implements ISearchParamRegistry {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(SearchParamRegistryImpl.class.getName()); jobDetail.setId(SearchParamRegistryImpl.class.getName());
jobDetail.setJobClass(SubmitJob.class); jobDetail.setJobClass(SubmitJob.class);
mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_SECOND, false, jobDetail); mySchedulerService.scheduleFixedDelayLocal(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
} }
@Override @Override

View File

@ -94,7 +94,7 @@ public class SubscriptionLoader {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(SubscriptionLoader.class.getName()); jobDetail.setId(SubscriptionLoader.class.getName());
jobDetail.setJobClass(SubscriptionLoader.SubmitJob.class); jobDetail.setJobClass(SubscriptionLoader.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(REFRESH_INTERVAL, false, jobDetail); mySchedulerService.scheduleFixedDelayLocal(REFRESH_INTERVAL, jobDetail);
} }
@VisibleForTesting @VisibleForTesting

View File

@ -70,7 +70,7 @@ public class AnalyticsInterceptor extends InterceptorAdapter {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(getClass().getName()); jobDetail.setId(getClass().getName());
jobDetail.setJobClass(SubmitJob.class); jobDetail.setJobClass(SubmitJob.class);
mySchedulerService.scheduleFixedDelay(5000, false, jobDetail); mySchedulerService.scheduleFixedDelayLocal(5000, jobDetail);
} }
@PreDestroy @PreDestroy