diff --git a/hapi-fhir-cli/hapi-fhir-cli-api/src/main/java/ca/uhn/fhir/cli/BaseFlywayMigrateDatabaseCommand.java b/hapi-fhir-cli/hapi-fhir-cli-api/src/main/java/ca/uhn/fhir/cli/BaseFlywayMigrateDatabaseCommand.java index c3118f401fc..8f521b8fd73 100644 --- a/hapi-fhir-cli/hapi-fhir-cli-api/src/main/java/ca/uhn/fhir/cli/BaseFlywayMigrateDatabaseCommand.java +++ b/hapi-fhir-cli/hapi-fhir-cli-api/src/main/java/ca/uhn/fhir/cli/BaseFlywayMigrateDatabaseCommand.java @@ -29,6 +29,9 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.List; +import java.util.Set; import java.util.*; import java.util.stream.Collectors; @@ -66,13 +69,6 @@ public abstract class BaseFlywayMigrateDatabaseCommand extends B return MIGRATE_DATABASE; } - @Override - public List provideUsageNotes() { - String versions = "The following versions are supported: " + - provideAllowedVersions().stream().map(Enum::name).collect(Collectors.joining(", ")); - return Collections.singletonList(versions); - } - @Override public Options getOptions() { Options retVal = new Options(); diff --git a/hapi-fhir-cli/hapi-fhir-cli-api/src/test/java/ca/uhn/fhir/cli/HapiFlywayMigrateDatabaseCommandTest.java b/hapi-fhir-cli/hapi-fhir-cli-api/src/test/java/ca/uhn/fhir/cli/HapiFlywayMigrateDatabaseCommandTest.java index b3ea3c808f2..79dfd067536 100644 --- a/hapi-fhir-cli/hapi-fhir-cli-api/src/test/java/ca/uhn/fhir/cli/HapiFlywayMigrateDatabaseCommandTest.java +++ b/hapi-fhir-cli/hapi-fhir-cli-api/src/test/java/ca/uhn/fhir/cli/HapiFlywayMigrateDatabaseCommandTest.java @@ -37,6 +37,7 @@ public class HapiFlywayMigrateDatabaseCommandTest { System.setProperty("test", "true"); } + // TODO INTERMITTENT This just failed for me on CI with a BadSqlGrammarException @Test public void testMigrateFrom340() throws IOException, SQLException { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java index 987936d8677..b6967639c5c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java @@ -25,14 +25,14 @@ import ca.uhn.fhir.jpa.dao.DaoRegistry; import ca.uhn.fhir.jpa.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.ISearchBuilder; -import ca.uhn.fhir.jpa.model.cross.ResourcePersistentId; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity; import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; -import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob; +import ca.uhn.fhir.jpa.model.cross.ResourcePersistentId; +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; @@ -53,9 +53,7 @@ import org.hl7.fhir.instance.model.api.IBaseBinary; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.InstantType; -import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; -import org.quartz.PersistJobDataAfterExecution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -80,7 +78,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; public class BulkDataExportSvcImpl implements IBulkDataExportSvc { - private static final long REFRESH_INTERVAL = 10 * DateUtils.MILLIS_PER_SECOND; private static final Logger ourLog = LoggerFactory.getLogger(BulkDataExportSvcImpl.class); private int myReuseBulkExportForMillis = (int) (60 * DateUtils.MILLIS_PER_MINUTE); @@ -313,13 +310,22 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { @PostConstruct public void start() { - ourLog.info("Bulk export service starting with refresh interval {}", StopWatch.formatMillis(REFRESH_INTERVAL)); myTxTemplate = new TransactionTemplate(myTxManager); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(BulkDataExportSvcImpl.class.getName()); - jobDetail.setJobClass(BulkDataExportSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(REFRESH_INTERVAL, true, jobDetail); + jobDetail.setId(getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); + } + + public static class Job implements HapiJob { + @Autowired + private IBulkDataExportSvc myTarget; + + @Override + public void execute(JobExecutionContext theContext) { + myTarget.buildExportFiles(); + } } @Transactional @@ -468,22 +474,4 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { return null; }); } - - @DisallowConcurrentExecution - @PersistJobDataAfterExecution - public static class SubmitJob extends FireAtIntervalJob { - @Autowired - private IBulkDataExportSvc myTarget; - - public SubmitJob() { - super(REFRESH_INTERVAL); - } - - @Override - protected void doExecute(JobExecutionContext theContext) { - myTarget.buildExportFiles(); - } - } - - } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java index 0fdb7f9de30..759983d09ba 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java @@ -10,14 +10,13 @@ import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider; import ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl; import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc; import ca.uhn.fhir.jpa.dao.DaoRegistry; -import ca.uhn.fhir.jpa.delete.DeleteConflictService; import ca.uhn.fhir.jpa.graphql.JpaStorageServices; import ca.uhn.fhir.jpa.interceptor.JpaConsentContextServices; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider; import ca.uhn.fhir.jpa.provider.TerminologyUploaderProvider; import ca.uhn.fhir.jpa.sched.AutowiringSpringBeanJobFactory; -import ca.uhn.fhir.jpa.sched.SchedulerServiceImpl; +import ca.uhn.fhir.jpa.sched.HapiSchedulerServiceImpl; import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider; import ca.uhn.fhir.jpa.search.IStaleSearchDeletingSvc; import ca.uhn.fhir.jpa.search.StaleSearchDeletingSvcImpl; @@ -36,13 +35,6 @@ import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribable import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.module.matcher.InMemorySubscriptionMatcher; -import ca.uhn.fhir.jpa.term.TermCodeSystemStorageSvcImpl; -import ca.uhn.fhir.jpa.term.TermDeferredStorageSvcImpl; -import ca.uhn.fhir.jpa.term.api.ITermVersionAdapterSvc; -import ca.uhn.fhir.jpa.term.TermReindexingSvcImpl; -import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc; -import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc; -import ca.uhn.fhir.jpa.term.api.ITermReindexingSvc; import ca.uhn.fhir.rest.server.interceptor.consent.IConsentContextServices; import org.hibernate.jpa.HibernatePersistenceProvider; import org.hl7.fhir.utilities.graphql.IGraphQLStorageServices; @@ -91,6 +83,7 @@ public abstract class BaseConfig { public static final String TASK_EXECUTOR_NAME = "hapiJpaTaskExecutor"; public static final String GRAPHQL_PROVIDER_NAME = "myGraphQLProvider"; + private static final String HAPI_DEFAULT_SCHEDULER_GROUP = "HAPI"; @Autowired protected Environment myEnv; @@ -257,7 +250,7 @@ public abstract class BaseConfig { @Bean public ISchedulerService schedulerService() { - return new SchedulerServiceImpl(); + return new HapiSchedulerServiceImpl().setDefaultGroup(HAPI_DEFAULT_SCHEDULER_GROUP); } @Bean diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseHapiScheduler.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseHapiScheduler.java new file mode 100644 index 00000000000..418e9b8d050 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseHapiScheduler.java @@ -0,0 +1,184 @@ +package ca.uhn.fhir.jpa.sched; + +import ca.uhn.fhir.context.ConfigurationException; +import ca.uhn.fhir.jpa.model.sched.IHapiScheduler; +import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; +import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import org.apache.commons.lang3.Validate; +import org.jetbrains.annotations.NotNull; +import org.quartz.*; +import org.quartz.impl.JobDetailImpl; +import org.quartz.impl.StdSchedulerFactory; +import org.quartz.impl.matchers.GroupMatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME; + +public abstract class BaseHapiScheduler implements IHapiScheduler { + private static final Logger ourLog = LoggerFactory.getLogger(BaseHapiScheduler.class); + + private static final AtomicInteger ourNextSchedulerId = new AtomicInteger(); + + private final String myThreadNamePrefix; + private final AutowiringSpringBeanJobFactory mySpringBeanJobFactory; + private final StdSchedulerFactory myFactory = new StdSchedulerFactory(); + private final Properties myProperties = new Properties(); + + private Scheduler myScheduler; + private String myInstanceName; + + public BaseHapiScheduler(String theThreadNamePrefix, AutowiringSpringBeanJobFactory theSpringBeanJobFactory) { + myThreadNamePrefix = theThreadNamePrefix; + mySpringBeanJobFactory = theSpringBeanJobFactory; + } + + + void setInstanceName(String theInstanceName) { + myInstanceName = theInstanceName; + } + + + int nextSchedulerId() { + return ourNextSchedulerId.getAndIncrement(); + } + + @Override + public void init() throws SchedulerException { + setProperties(); + myFactory.initialize(myProperties); + myScheduler = myFactory.getScheduler(); + myScheduler.setJobFactory(mySpringBeanJobFactory); + myScheduler.standby(); + } + + protected void setProperties() { + addProperty("org.quartz.threadPool.threadCount", "4"); + myProperties.setProperty(PROP_SCHED_INSTANCE_NAME, myInstanceName + "-" + nextSchedulerId()); + addProperty("org.quartz.threadPool.threadNamePrefix", getThreadPrefix()); + } + + @NotNull + private String getThreadPrefix() { + return myThreadNamePrefix + "-" + myInstanceName; + } + + protected void addProperty(String key, String value) { + myProperties.put(key, value); + } + + @Override + public void start() { + if (myScheduler == null) { + throw new ConfigurationException("Attempt to start uninitialized scheduler"); + } + try { + ourLog.info("Starting scheduler {}", getThreadPrefix()); + myScheduler.start(); + } catch (SchedulerException e) { + ourLog.error("Failed to start up scheduler", e); + throw new ConfigurationException("Failed to start up scheduler", e); + } + } + + @Override + public void shutdown() { + if (myScheduler == null) { + return; + } + try { + myScheduler.shutdown(true); + } catch (SchedulerException e) { + ourLog.error("Failed to shut down scheduler", e); + throw new ConfigurationException("Failed to shut down scheduler", e); + } + } + + @Override + public boolean isStarted() { + try { + return myScheduler != null && myScheduler.isStarted(); + } catch (SchedulerException e) { + ourLog.error("Failed to determine scheduler status", e); + return false; + } + } + + @Override + public void clear() throws SchedulerException { + myScheduler.clear(); + } + + @Override + public void logStatusForUnitTest() { + try { + Set keys = myScheduler.getJobKeys(GroupMatcher.anyGroup()); + String keysString = keys.stream().map(t -> t.getName()).collect(Collectors.joining(", ")); + ourLog.info("Local scheduler has jobs: {}", keysString); + } catch (SchedulerException e) { + ourLog.error("Failed to get log status for scheduler", e); + throw new InternalErrorException("Failed to get log status for scheduler", e); + } + } + + @Override + public void scheduleJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) { + Validate.isTrue(theIntervalMillis >= 100); + + Validate.notNull(theJobDefinition); + Validate.notNull(theJobDefinition.getJobClass()); + Validate.notBlank(theJobDefinition.getId()); + + JobKey jobKey; + + jobKey = new JobKey(theJobDefinition.getId(), theJobDefinition.getGroup()); + + JobDetailImpl jobDetail = new NonConcurrentJobDetailImpl(); + jobDetail.setJobClass(theJobDefinition.getJobClass()); + jobDetail.setKey(jobKey); + jobDetail.setJobDataMap(new JobDataMap(theJobDefinition.getJobData())); + + ScheduleBuilder schedule = SimpleScheduleBuilder + .simpleSchedule() + .withIntervalInMilliseconds(theIntervalMillis) + .repeatForever(); + + Trigger trigger = TriggerBuilder.newTrigger() + .forJob(jobDetail) + .startNow() + .withSchedule(schedule) + .build(); + + Set triggers = Sets.newHashSet(trigger); + try { + myScheduler.scheduleJob(jobDetail, triggers, true); + } catch (SchedulerException e) { + ourLog.error("Failed to schedule job", e); + throw new InternalErrorException(e); + } + + } + + @VisibleForTesting + @Override + public Set getJobKeysForUnitTest() throws SchedulerException { + return myScheduler.getJobKeys(GroupMatcher.anyGroup()); + } + + private static class NonConcurrentJobDetailImpl extends JobDetailImpl { + private static final long serialVersionUID = 5716197221121989740L; + + // All HAPI FHIR jobs shouldn't allow concurrent execution + @Override + public boolean isConcurrentExectionDisallowed() { + return true; + } + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseSchedulerServiceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseSchedulerServiceImpl.java new file mode 100644 index 00000000000..77da433d765 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseSchedulerServiceImpl.java @@ -0,0 +1,225 @@ +package ca.uhn.fhir.jpa.sched; + +/*- + * #%L + * HAPI FHIR JPA Server + * %% + * Copyright (C) 2014 - 2019 University Health Network + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import ca.uhn.fhir.context.ConfigurationException; +import ca.uhn.fhir.jpa.model.sched.IHapiScheduler; +import ca.uhn.fhir.jpa.model.sched.ISchedulerService; +import ca.uhn.fhir.jpa.model.sched.ISmartLifecyclePhase; +import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; +import ca.uhn.fhir.util.StopWatch; +import com.google.common.annotations.VisibleForTesting; +import org.quartz.JobKey; +import org.quartz.SchedulerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.SmartLifecycle; +import org.springframework.core.env.Environment; + +import javax.annotation.PostConstruct; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class provides task scheduling for the entire module using the Quartz library. + * Inside here, we have two schedulers: + *
    + *
  • + * The Local Scheduler handles tasks that need to execute locally. This + * typically means things that should happen on all nodes in a clustered + * environment. + *
  • + *
  • + * The Cluster Scheduler handles tasks that are distributed and should be + * handled by only one node in the cluster (assuming a clustered server). If the + * server is not clustered, this scheduler acts the same way as the + * local scheduler. + *
  • + *
+ */ +public abstract class BaseSchedulerServiceImpl implements ISchedulerService, SmartLifecycle { + public static final String SCHEDULING_DISABLED = "scheduling_disabled"; + public static final String SCHEDULING_DISABLED_EQUALS_TRUE = SCHEDULING_DISABLED + "=true"; + + private static final Logger ourLog = LoggerFactory.getLogger(BaseSchedulerServiceImpl.class); + private IHapiScheduler myLocalScheduler; + private IHapiScheduler myClusteredScheduler; + private boolean myLocalSchedulingEnabled; + private boolean myClusteredSchedulingEnabled; + private AtomicBoolean myStopping = new AtomicBoolean(false); + private String myDefaultGroup; + + @Autowired + private Environment myEnvironment; + @Autowired + private ApplicationContext myApplicationContext; + @Autowired + protected AutowiringSpringBeanJobFactory mySchedulerJobFactory; + + public BaseSchedulerServiceImpl() { + setLocalSchedulingEnabled(true); + setClusteredSchedulingEnabled(true); + } + + public BaseSchedulerServiceImpl setDefaultGroup(String theDefaultGroup) { + myDefaultGroup = theDefaultGroup; + return this; + } + + public boolean isLocalSchedulingEnabled() { + return myLocalSchedulingEnabled; + } + + public void setLocalSchedulingEnabled(boolean theLocalSchedulingEnabled) { + myLocalSchedulingEnabled = theLocalSchedulingEnabled; + } + + public boolean isClusteredSchedulingEnabled() { + return myClusteredSchedulingEnabled; + } + + public void setClusteredSchedulingEnabled(boolean theClusteredSchedulingEnabled) { + myClusteredSchedulingEnabled = theClusteredSchedulingEnabled; + } + + @PostConstruct + public void create() throws SchedulerException { + myLocalScheduler = createScheduler(false); + myClusteredScheduler = createScheduler(true); + myStopping.set(false); + } + + private IHapiScheduler createScheduler(boolean theClustered) throws SchedulerException { + if (!isLocalSchedulingEnabled() || isSchedulingDisabledForUnitTests()) { + ourLog.info("Scheduling is disabled on this server"); + return new HapiNullScheduler(); + } + IHapiScheduler retval; + if (theClustered) { + ourLog.info("Creating Clustered Scheduler"); + retval = getClusteredScheduler(); + } else { + ourLog.info("Creating Local Scheduler"); + retval = getLocalHapiScheduler(); + } + retval.init(); + return retval; + } + + protected abstract IHapiScheduler getLocalHapiScheduler(); + + protected abstract IHapiScheduler getClusteredScheduler(); + + /** + * We defer startup of executing started tasks until we're sure we're ready for it + * and the startup is completely done + */ + + @Override + public int getPhase() { + return ISmartLifecyclePhase.SCHEDULER_1000; + } + + @Override + public void start() { + try { + ourLog.info("Starting task schedulers for context {}", myApplicationContext.getId()); + if (myLocalScheduler != null) { + myLocalScheduler.start(); + } + if (myClusteredScheduler != null) { + myClusteredScheduler.start(); + } + } catch (Exception e) { + ourLog.error("Failed to start scheduler", e); + throw new ConfigurationException("Failed to start scheduler", e); + } + } + + @Override + public void stop() { + ourLog.info("Shutting down task scheduler..."); + + myStopping.set(true); + myLocalScheduler.shutdown(); + myClusteredScheduler.shutdown(); + } + + @Override + public boolean isRunning() { + return !myStopping.get() && myLocalScheduler.isStarted() && myClusteredScheduler.isStarted(); + } + + @Override + public void purgeAllScheduledJobsForUnitTest() throws SchedulerException { + myLocalScheduler.clear(); + myClusteredScheduler.clear(); + } + + @Override + public void logStatusForUnitTest() { + myLocalScheduler.logStatusForUnitTest(); + myClusteredScheduler.logStatusForUnitTest(); + } + + @Override + public void scheduleLocalJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) { + scheduleJob("local", myLocalScheduler, theIntervalMillis, theJobDefinition); + } + + @Override + public void scheduleClusteredJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) { + scheduleJob("clustered", myClusteredScheduler, theIntervalMillis, theJobDefinition); + } + + private void scheduleJob(String theInstanceName, IHapiScheduler theScheduler, long theIntervalMillis, ScheduledJobDefinition theJobDefinition) { + ourLog.info("Scheduling {} job {} with interval {}", theInstanceName, theJobDefinition.getId(), StopWatch.formatMillis(theIntervalMillis)); + if (theJobDefinition.getGroup() == null) { + theJobDefinition.setGroup(myDefaultGroup); + } + theScheduler.scheduleJob(theIntervalMillis, theJobDefinition); + } + + @VisibleForTesting + @Override + public Set getLocalJobKeysForUnitTest() throws SchedulerException { + return myLocalScheduler.getJobKeysForUnitTest(); + } + + @VisibleForTesting + @Override + public Set getClusteredJobKeysForUnitTest() throws SchedulerException { + return myClusteredScheduler.getJobKeysForUnitTest(); + } + + private boolean isSchedulingDisabledForUnitTests() { + String schedulingDisabled = myEnvironment.getProperty(SCHEDULING_DISABLED); + return "true".equals(schedulingDisabled); + } + + @Override + public boolean isStopping() { + return myStopping.get(); + } + +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/ClusteredHapiScheduler.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/ClusteredHapiScheduler.java new file mode 100644 index 00000000000..d09e7617ca3 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/ClusteredHapiScheduler.java @@ -0,0 +1,8 @@ +package ca.uhn.fhir.jpa.sched; + +public class ClusteredHapiScheduler extends BaseHapiScheduler { + public ClusteredHapiScheduler(String theThreadNamePrefix, AutowiringSpringBeanJobFactory theSpringBeanJobFactory) { + super(theThreadNamePrefix, theSpringBeanJobFactory); + setInstanceName("clustered"); + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/HapiNullScheduler.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/HapiNullScheduler.java new file mode 100644 index 00000000000..42bc1571137 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/HapiNullScheduler.java @@ -0,0 +1,54 @@ +package ca.uhn.fhir.jpa.sched; + +import ca.uhn.fhir.jpa.model.sched.IHapiScheduler; +import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; +import org.quartz.JobKey; +import org.quartz.SchedulerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +public class HapiNullScheduler implements IHapiScheduler { + private static final Logger ourLog = LoggerFactory.getLogger(HapiNullScheduler.class); + + @Override + public void init() { + // nothing + } + + @Override + public void start() { + + } + + @Override + public void shutdown() { + + } + + @Override + public boolean isStarted() { + return true; + } + + @Override + public void clear() throws SchedulerException { + + } + + @Override + public void logStatusForUnitTest() { + + } + + @Override + public void scheduleJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) { + ourLog.debug("Skipping scheduling job {} since scheduling is disabled", theJobDefinition.getId()); + } + + @Override + public Set getJobKeysForUnitTest() { + return null; + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/HapiSchedulerServiceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/HapiSchedulerServiceImpl.java new file mode 100644 index 00000000000..124269d8f10 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/HapiSchedulerServiceImpl.java @@ -0,0 +1,17 @@ +package ca.uhn.fhir.jpa.sched; + +import ca.uhn.fhir.jpa.model.sched.IHapiScheduler; + +public class HapiSchedulerServiceImpl extends BaseSchedulerServiceImpl { + public static final String THREAD_NAME_PREFIX = "hapi-fhir-jpa-scheduler"; + + @Override + protected IHapiScheduler getLocalHapiScheduler() { + return new LocalHapiScheduler(THREAD_NAME_PREFIX, mySchedulerJobFactory); + } + + @Override + protected IHapiScheduler getClusteredScheduler() { + return new ClusteredHapiScheduler(THREAD_NAME_PREFIX, mySchedulerJobFactory); + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/LocalHapiScheduler.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/LocalHapiScheduler.java new file mode 100644 index 00000000000..7c7067cf509 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/LocalHapiScheduler.java @@ -0,0 +1,8 @@ +package ca.uhn.fhir.jpa.sched; + +public class LocalHapiScheduler extends BaseHapiScheduler { + public LocalHapiScheduler(String theThreadNamePrefix, AutowiringSpringBeanJobFactory theSpringBeanJobFactory) { + super(theThreadNamePrefix, theSpringBeanJobFactory); + setInstanceName("local"); + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImpl.java deleted file mode 100644 index b996f1f600d..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImpl.java +++ /dev/null @@ -1,579 +0,0 @@ -package ca.uhn.fhir.jpa.sched; - -/*- - * #%L - * HAPI FHIR JPA Server - * %% - * Copyright (C) 2014 - 2019 University Health Network - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import ca.uhn.fhir.jpa.model.sched.ISchedulerService; -import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; -import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; -import com.google.common.collect.Sets; -import org.apache.commons.lang3.Validate; -import org.quartz.Calendar; -import org.quartz.*; -import org.quartz.impl.JobDetailImpl; -import org.quartz.impl.StdSchedulerFactory; -import org.quartz.impl.matchers.GroupMatcher; -import org.quartz.spi.JobFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.event.ContextRefreshedEvent; -import org.springframework.context.event.EventListener; -import org.springframework.core.env.Environment; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME; - -/** - * This class provides task scheduling for the entire module using the Quartz library. - * Inside here, we have two schedulers: - *
    - *
  • - * The Local Scheduler handles tasks that need to execute locally. This - * typically means things that should happen on all nodes in a clustered - * environment. - *
  • - *
  • - * The Cluster Scheduler handles tasks that are distributed and should be - * handled by only one node in the cluster (assuming a clustered server). If the - * server is not clustered, this scheduler acts the same way as the - * local scheduler. - *
  • - *
- */ -public class SchedulerServiceImpl implements ISchedulerService { - public static final String SCHEDULING_DISABLED = "scheduling_disabled"; - public static final String SCHEDULING_DISABLED_EQUALS_TRUE = SCHEDULING_DISABLED + "=true"; - - private static final Logger ourLog = LoggerFactory.getLogger(SchedulerServiceImpl.class); - private static int ourNextSchedulerId = 0; - private Scheduler myLocalScheduler; - private Scheduler myClusteredScheduler; - private String myThreadNamePrefix; - private boolean myLocalSchedulingEnabled; - private boolean myClusteredSchedulingEnabled; - @Autowired - private AutowiringSpringBeanJobFactory mySpringBeanJobFactory; - private AtomicBoolean myStopping = new AtomicBoolean(false); - @Autowired - private Environment myEnvironment; - - /** - * Constructor - */ - public SchedulerServiceImpl() { - setThreadNamePrefix("hapi-fhir-jpa-scheduler"); - setLocalSchedulingEnabled(true); - setClusteredSchedulingEnabled(true); - } - - public boolean isLocalSchedulingEnabled() { - return myLocalSchedulingEnabled; - } - - public void setLocalSchedulingEnabled(boolean theLocalSchedulingEnabled) { - myLocalSchedulingEnabled = theLocalSchedulingEnabled; - } - - public boolean isClusteredSchedulingEnabled() { - return myClusteredSchedulingEnabled; - } - - public void setClusteredSchedulingEnabled(boolean theClusteredSchedulingEnabled) { - myClusteredSchedulingEnabled = theClusteredSchedulingEnabled; - } - - public String getThreadNamePrefix() { - return myThreadNamePrefix; - } - - public void setThreadNamePrefix(String theThreadNamePrefix) { - myThreadNamePrefix = theThreadNamePrefix; - } - - @PostConstruct - public void start() throws SchedulerException { - myLocalScheduler = createLocalScheduler(); - myClusteredScheduler = createClusteredScheduler(); - myStopping.set(false); - } - - /** - * We defer startup of executing started tasks until we're sure we're ready for it - * and the startup is completely done - */ - @EventListener - public void contextStarted(ContextRefreshedEvent theEvent) throws SchedulerException { - try { - ourLog.info("Starting task schedulers for context {}", theEvent != null ? theEvent.getApplicationContext().getId() : "null"); - if (myLocalScheduler != null) { - myLocalScheduler.start(); - } - if (myClusteredScheduler != null) { - myClusteredScheduler.start(); - } - } catch (Exception e) { - ourLog.error("Failed to start context", e); - throw new SchedulerException(e); - } - } - - private Scheduler createLocalScheduler() throws SchedulerException { - if (!isLocalSchedulingEnabled() || isSchedulingDisabledForUnitTests()) { - return new NullScheduler(); - } - Properties localProperties = new Properties(); - localProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "local-" + ourNextSchedulerId++); - quartzPropertiesCommon(localProperties); - quartzPropertiesLocal(localProperties); - StdSchedulerFactory factory = new StdSchedulerFactory(); - factory.initialize(localProperties); - Scheduler scheduler = factory.getScheduler(); - configureSchedulerCommon(scheduler); - scheduler.standby(); - return scheduler; - } - - private Scheduler createClusteredScheduler() throws SchedulerException { - if (!isClusteredSchedulingEnabled() || isSchedulingDisabledForUnitTests()) { - return new NullScheduler(); - } - Properties clusteredProperties = new Properties(); - clusteredProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "clustered-" + ourNextSchedulerId++); - quartzPropertiesCommon(clusteredProperties); - quartzPropertiesClustered(clusteredProperties); - StdSchedulerFactory factory = new StdSchedulerFactory(); - factory.initialize(clusteredProperties); - Scheduler scheduler = factory.getScheduler(); - configureSchedulerCommon(scheduler); - scheduler.standby(); - return scheduler; - } - - private void configureSchedulerCommon(Scheduler theScheduler) throws SchedulerException { - theScheduler.setJobFactory(mySpringBeanJobFactory); - } - - @PreDestroy - public void stop() throws SchedulerException { - ourLog.info("Shutting down task scheduler..."); - - myStopping.set(true); - myLocalScheduler.shutdown(true); - myClusteredScheduler.shutdown(true); - } - - @Override - public void purgeAllScheduledJobsForUnitTest() throws SchedulerException { - myLocalScheduler.clear(); - myClusteredScheduler.clear(); - } - - @Override - public void logStatus() { - try { - Set keys = myLocalScheduler.getJobKeys(GroupMatcher.anyGroup()); - String keysString = keys.stream().map(t -> t.getName()).collect(Collectors.joining(", ")); - ourLog.info("Local scheduler has jobs: {}", keysString); - - keys = myClusteredScheduler.getJobKeys(GroupMatcher.anyGroup()); - keysString = keys.stream().map(t -> t.getName()).collect(Collectors.joining(", ")); - ourLog.info("Clustered scheduler has jobs: {}", keysString); - } catch (SchedulerException e) { - throw new InternalErrorException(e); - } - } - - @Override - public void scheduleFixedDelay(long theIntervalMillis, boolean theClusteredTask, ScheduledJobDefinition theJobDefinition) { - Validate.isTrue(theIntervalMillis >= 100); - - Validate.notNull(theJobDefinition); - Validate.notNull(theJobDefinition.getJobClass()); - Validate.notBlank(theJobDefinition.getId()); - - JobKey jobKey = new JobKey(theJobDefinition.getId()); - - JobDetailImpl jobDetail = new NonConcurrentJobDetailImpl(); - jobDetail.setJobClass(theJobDefinition.getJobClass()); - jobDetail.setKey(jobKey); - jobDetail.setName(theJobDefinition.getId()); - jobDetail.setJobDataMap(new JobDataMap(theJobDefinition.getJobData())); - - ScheduleBuilder schedule = SimpleScheduleBuilder - .simpleSchedule() - .withIntervalInMilliseconds(theIntervalMillis) - .repeatForever(); - - Trigger trigger = TriggerBuilder.newTrigger() - .forJob(jobDetail) - .startNow() - .withSchedule(schedule) - .build(); - - Set triggers = Sets.newHashSet(trigger); - try { - Scheduler scheduler; - if (theClusteredTask) { - scheduler = myClusteredScheduler; - } else { - scheduler = myLocalScheduler; - } - scheduler.scheduleJob(jobDetail, triggers, true); - } catch (SchedulerException e) { - ourLog.error("Failed to schedule job", e); - throw new InternalErrorException(e); - } - - } - - @Override - public boolean isStopping() { - return myStopping.get(); - } - - /** - * Properties for the local scheduler (see the class docs to learn what this means) - */ - protected void quartzPropertiesLocal(Properties theProperties) { - // nothing - } - - /** - * Properties for the cluster scheduler (see the class docs to learn what this means) - */ - protected void quartzPropertiesClustered(Properties theProperties) { -// theProperties.put("org.quartz.jobStore.tablePrefix", "QRTZHFJC_"); - } - - protected void quartzPropertiesCommon(Properties theProperties) { - theProperties.put("org.quartz.threadPool.threadCount", "4"); - theProperties.put("org.quartz.threadPool.threadNamePrefix", getThreadNamePrefix() + "-" + theProperties.get(PROP_SCHED_INSTANCE_NAME)); - } - - private boolean isSchedulingDisabledForUnitTests() { - String schedulingDisabled = myEnvironment.getProperty(SCHEDULING_DISABLED); - return "true".equals(schedulingDisabled); - } - - private static class NonConcurrentJobDetailImpl extends JobDetailImpl { - private static final long serialVersionUID = 5716197221121989740L; - - // All HAPI FHIR jobs shouldn't allow concurrent execution - @Override - public boolean isConcurrentExectionDisallowed() { - return true; - } - } - - private static class NullScheduler implements Scheduler { - @Override - public String getSchedulerName() { - return null; - } - - @Override - public String getSchedulerInstanceId() { - return null; - } - - @Override - public SchedulerContext getContext() { - return null; - } - - @Override - public void start() { - - } - - @Override - public void startDelayed(int seconds) { - - } - - @Override - public boolean isStarted() { - return false; - } - - @Override - public void standby() { - - } - - @Override - public boolean isInStandbyMode() { - return false; - } - - @Override - public void shutdown() { - - } - - @Override - public void shutdown(boolean waitForJobsToComplete) { - - } - - @Override - public boolean isShutdown() { - return false; - } - - @Override - public SchedulerMetaData getMetaData() { - return null; - } - - @Override - public List getCurrentlyExecutingJobs() { - return null; - } - - @Override - public void setJobFactory(JobFactory factory) { - - } - - @Override - public ListenerManager getListenerManager() { - return null; - } - - @Override - public Date scheduleJob(JobDetail jobDetail, Trigger trigger) { - return null; - } - - @Override - public Date scheduleJob(Trigger trigger) { - return null; - } - - @Override - public void scheduleJobs(Map> triggersAndJobs, boolean replace) { - - } - - @Override - public void scheduleJob(JobDetail jobDetail, Set triggersForJob, boolean replace) { - - } - - @Override - public boolean unscheduleJob(TriggerKey triggerKey) { - return false; - } - - @Override - public boolean unscheduleJobs(List triggerKeys) { - return false; - } - - @Override - public Date rescheduleJob(TriggerKey triggerKey, Trigger newTrigger) { - return null; - } - - @Override - public void addJob(JobDetail jobDetail, boolean replace) { - - } - - @Override - public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) { - - } - - @Override - public boolean deleteJob(JobKey jobKey) { - return false; - } - - @Override - public boolean deleteJobs(List jobKeys) { - return false; - } - - @Override - public void triggerJob(JobKey jobKey) { - - } - - @Override - public void triggerJob(JobKey jobKey, JobDataMap data) { - - } - - @Override - public void pauseJob(JobKey jobKey) { - - } - - @Override - public void pauseJobs(GroupMatcher matcher) { - - } - - @Override - public void pauseTrigger(TriggerKey triggerKey) { - - } - - @Override - public void pauseTriggers(GroupMatcher matcher) { - - } - - @Override - public void resumeJob(JobKey jobKey) { - - } - - @Override - public void resumeJobs(GroupMatcher matcher) { - - } - - @Override - public void resumeTrigger(TriggerKey triggerKey) { - - } - - @Override - public void resumeTriggers(GroupMatcher matcher) { - - } - - @Override - public void pauseAll() { - - } - - @Override - public void resumeAll() { - - } - - @Override - public List getJobGroupNames() { - return null; - } - - @Override - public Set getJobKeys(GroupMatcher matcher) { - return null; - } - - @Override - public List getTriggersOfJob(JobKey jobKey) { - return null; - } - - @Override - public List getTriggerGroupNames() { - return null; - } - - @Override - public Set getTriggerKeys(GroupMatcher matcher) { - return null; - } - - @Override - public Set getPausedTriggerGroups() { - return null; - } - - @Override - public JobDetail getJobDetail(JobKey jobKey) { - return null; - } - - @Override - public Trigger getTrigger(TriggerKey triggerKey) { - return null; - } - - @Override - public Trigger.TriggerState getTriggerState(TriggerKey triggerKey) { - return null; - } - - @Override - public void resetTriggerFromErrorState(TriggerKey triggerKey) { - - } - - @Override - public void addCalendar(String calName, Calendar calendar, boolean replace, boolean updateTriggers) { - - } - - @Override - public boolean deleteCalendar(String calName) { - return false; - } - - @Override - public Calendar getCalendar(String calName) { - return null; - } - - @Override - public List getCalendarNames() { - return null; - } - - @Override - public boolean interrupt(JobKey jobKey) throws UnableToInterruptJobException { - return false; - } - - @Override - public boolean interrupt(String fireInstanceId) throws UnableToInterruptJobException { - return false; - } - - @Override - public boolean checkExists(JobKey jobKey) { - return false; - } - - @Override - public boolean checkExists(TriggerKey triggerKey) { - return false; - } - - @Override - public void clear() { - - } - } - - -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/StaleSearchDeletingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/StaleSearchDeletingSvcImpl.java index af25e9a0eea..1f19823e2c6 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/StaleSearchDeletingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/StaleSearchDeletingSvcImpl.java @@ -21,10 +21,10 @@ package ca.uhn.fhir.jpa.search; */ import ca.uhn.fhir.jpa.dao.DaoConfig; +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; -import org.quartz.Job; import org.quartz.JobExecutionContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.annotation.Propagation; @@ -58,11 +58,21 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc { } @PostConstruct - public void registerScheduledJob() { + public void scheduleJob() { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(StaleSearchDeletingSvcImpl.class.getName()); - jobDetail.setJobClass(StaleSearchDeletingSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(DEFAULT_CUTOFF_SLACK, true, jobDetail); + jobDetail.setId(getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleClusteredJob(DEFAULT_CUTOFF_SLACK, jobDetail); + } + + public static class Job implements HapiJob { + @Autowired + private IStaleSearchDeletingSvc myTarget; + + @Override + public void execute(JobExecutionContext theContext) { + myTarget.schedulePollForStaleSearches(); + } } @Transactional(propagation = Propagation.NEVER) @@ -72,14 +82,4 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc { pollForStaleSearchesAndDeleteThem(); } } - - public static class SubmitJob implements Job { - @Autowired - private IStaleSearchDeletingSvc myTarget; - - @Override - public void execute(JobExecutionContext theContext) { - myTarget.schedulePollForStaleSearches(); - } - } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/BaseSearchCacheSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/BaseSearchCacheSvcImpl.java index dfa11bfaa0a..f9bdf94df79 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/BaseSearchCacheSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/cache/BaseSearchCacheSvcImpl.java @@ -21,10 +21,10 @@ package ca.uhn.fhir.jpa.search.cache; */ import ca.uhn.fhir.jpa.entity.Search; +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import org.apache.commons.lang3.time.DateUtils; -import org.quartz.Job; import org.quartz.JobExecutionContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.PlatformTransactionManager; @@ -51,11 +51,11 @@ public abstract class BaseSearchCacheSvcImpl implements ISearchCacheSvc { } @PostConstruct - public void registerScheduledJob() { + public void scheduleJob() { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(BaseSearchCacheSvcImpl.class.getName()); - jobDetail.setJobClass(BaseSearchCacheSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_SECOND, false, jobDetail); + jobDetail.setId(getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleLocalJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); } @Override @@ -73,7 +73,7 @@ public abstract class BaseSearchCacheSvcImpl implements ISearchCacheSvc { protected abstract void flushLastUpdated(Long theSearchId, Date theLastUpdated); - public static class SubmitJob implements Job { + public static class Job implements HapiJob { @Autowired private ISearchCacheSvc myTarget; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/reindex/ResourceReindexingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/reindex/ResourceReindexingSvcImpl.java index ef98b9dbdca..deb7ab62ef1 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/reindex/ResourceReindexingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/reindex/ResourceReindexingSvcImpl.java @@ -33,6 +33,7 @@ import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity; import ca.uhn.fhir.jpa.model.entity.ForcedId; import ca.uhn.fhir.jpa.model.entity.ResourceTable; +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry; @@ -46,7 +47,6 @@ import org.apache.commons.lang3.time.DateUtils; import org.hibernate.search.util.impl.Executors; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.r4.model.InstantType; -import org.quartz.Job; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,10 +142,16 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc { myContext = theContext; } + @VisibleForTesting + void setSchedulerServiceForUnitTest(ISchedulerService theSchedulerService) { + mySchedulerService = theSchedulerService; + } + @PostConstruct public void start() { myTxTemplate = new TransactionTemplate(myTxManager); initExecutor(); + scheduleJob(); } public void initExecutor() { @@ -160,6 +166,13 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc { ); } + public void scheduleJob() { + ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); + jobDetail.setId(getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); + } + @Override @Transactional(Transactional.TxType.REQUIRED) public Long markAllResourcesForReindexing() { @@ -187,12 +200,14 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc { return job.getId(); } - @PostConstruct - public void registerScheduledJob() { - ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(ResourceReindexingSvcImpl.class.getName()); - jobDetail.setJobClass(ResourceReindexingSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_SECOND, true, jobDetail); + public static class Job implements HapiJob { + @Autowired + private IResourceReindexingSvc myTarget; + + @Override + public void execute(JobExecutionContext theContext) { + myTarget.runReindexingPass(); + } } @VisibleForTesting @@ -545,14 +560,4 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc { return myUpdated; } } - - public static class SubmitJob implements Job { - @Autowired - private IResourceReindexingSvc myTarget; - - @Override - public void execute(JobExecutionContext theContext) { - myTarget.runReindexingPass(); - } - } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java index 9cbf950be53..84c6df873c4 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java @@ -26,16 +26,14 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.dao.DaoRegistry; import ca.uhn.fhir.jpa.dao.IFhirResourceDao; -import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob; +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.util.UrlUtil; import org.apache.commons.lang3.time.DateUtils; -import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; -import org.quartz.PersistJobDataAfterExecution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -47,7 +45,6 @@ import java.util.*; @Component public class CacheWarmingSvcImpl implements ICacheWarmingSvc { - public static final long SCHEDULED_JOB_INTERVAL = 10 * DateUtils.MILLIS_PER_SECOND; private static final Logger ourLog = LoggerFactory.getLogger(CacheWarmingSvcImpl.class); @Autowired private DaoConfig myDaoConfig; @@ -83,14 +80,6 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc { } - @PostConstruct - public void registerScheduledJob() { - ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(CacheWarmingSvcImpl.class.getName()); - jobDetail.setJobClass(CacheWarmingSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(SCHEDULED_JOB_INTERVAL, true, jobDetail); - } - private void refreshNow(WarmCacheEntry theCacheEntry) { String nextUrl = theCacheEntry.getUrl(); @@ -113,6 +102,24 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc { @PostConstruct public void start() { initCacheMap(); + scheduleJob(); + } + + public void scheduleJob() { + ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); + jobDetail.setId(getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); + } + + public static class Job implements HapiJob { + @Autowired + private ICacheWarmingSvc myTarget; + + @Override + public void execute(JobExecutionContext theContext) { + myTarget.performWarmingPass(); + } } public synchronized Set initCacheMap() { @@ -129,22 +136,5 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc { } return Collections.unmodifiableSet(myCacheEntryToNextRefresh.keySet()); - - } - - @DisallowConcurrentExecution - @PersistJobDataAfterExecution - public static class SubmitJob extends FireAtIntervalJob { - @Autowired - private ICacheWarmingSvc myTarget; - - public SubmitJob() { - super(SCHEDULED_JOB_INTERVAL); - } - - @Override - protected void doExecute(JobExecutionContext theContext) { - myTarget.performWarmingPass(); - } } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java index 756ad775c24..ab0d21cbdf7 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java @@ -26,7 +26,7 @@ import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.dao.DaoRegistry; import ca.uhn.fhir.jpa.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.model.cross.ResourcePersistentId; -import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob; +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider; @@ -55,9 +55,7 @@ import org.hl7.fhir.instance.model.api.IBaseParameters; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IPrimitiveType; -import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; -import org.quartz.PersistJobDataAfterExecution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -77,7 +75,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; @Service public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc { - public static final long SCHEDULE_DELAY = DateUtils.MILLIS_PER_SECOND; private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class); private static final int DEFAULT_MAX_SUBMIT = 10000; private final List myActiveJobs = new ArrayList<>(); @@ -159,14 +156,6 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc return retVal; } - @PostConstruct - public void registerScheduledJob() { - ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(SubscriptionTriggeringSvcImpl.class.getName()); - jobDetail.setJobClass(SubscriptionTriggeringSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(SCHEDULE_DELAY, false, jobDetail); - } - @Override public void runDeliveryPass() { @@ -356,6 +345,11 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc @PostConstruct public void start() { + createExecutorService(); + scheduleJob(); + } + + private void createExecutorService() { LinkedBlockingQueue executorQueue = new LinkedBlockingQueue<>(1000); BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() .namingPattern("SubscriptionTriggering-%d") @@ -386,21 +380,21 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc executorQueue, threadFactory, rejectedExecutionHandler); - } - @DisallowConcurrentExecution - @PersistJobDataAfterExecution - public static class SubmitJob extends FireAtIntervalJob { + private void scheduleJob() { + ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); + jobDetail.setId(getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_SECOND, jobDetail); + } + + public static class Job implements HapiJob { @Autowired private ISubscriptionTriggeringSvc myTarget; - public SubmitJob() { - super(SCHEDULE_DELAY); - } - @Override - protected void doExecute(JobExecutionContext theContext) { + public void execute(JobExecutionContext theContext) { myTarget.runDeliveryPass(); } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java index 315162bfc85..7aa359816c7 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java @@ -23,18 +23,14 @@ package ca.uhn.fhir.jpa.term; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.support.IContextValidationSupport; -import ca.uhn.fhir.jpa.dao.DaoConfig; -import ca.uhn.fhir.jpa.dao.DaoRegistry; -import ca.uhn.fhir.jpa.dao.IDao; -import ca.uhn.fhir.jpa.dao.IFhirResourceDaoCodeSystem; -import ca.uhn.fhir.jpa.dao.IFhirResourceDaoValueSet; +import ca.uhn.fhir.jpa.dao.*; import ca.uhn.fhir.jpa.dao.IFhirResourceDaoValueSet.ValidateCodeResult; -import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc; import ca.uhn.fhir.jpa.dao.data.*; import ca.uhn.fhir.jpa.entity.*; import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink.RelationshipTypeEnum; import ca.uhn.fhir.jpa.model.cross.ResourcePersistentId; import ca.uhn.fhir.jpa.model.entity.ResourceTable; +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc; @@ -60,11 +56,7 @@ import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.time.DateUtils; import org.apache.lucene.index.Term; import org.apache.lucene.queries.TermsQuery; -import org.apache.lucene.search.BooleanClause; -import org.apache.lucene.search.BooleanQuery; -import org.apache.lucene.search.MultiPhraseQuery; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.RegexpQuery; +import org.apache.lucene.search.*; import org.hibernate.ScrollMode; import org.hibernate.ScrollableResults; import org.hibernate.search.jpa.FullTextEntityManager; @@ -78,7 +70,6 @@ import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.r4.model.*; import org.hl7.fhir.r4.model.codesystems.ConceptSubsumptionOutcome; -import org.quartz.Job; import org.quartz.JobExecutionContext; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; @@ -101,21 +92,14 @@ import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; import javax.persistence.PersistenceContextType; import javax.persistence.TypedQuery; -import javax.persistence.criteria.CriteriaBuilder; -import javax.persistence.criteria.CriteriaQuery; -import javax.persistence.criteria.Join; -import javax.persistence.criteria.Predicate; -import javax.persistence.criteria.Root; +import javax.persistence.criteria.*; import javax.validation.constraints.NotNull; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import static org.apache.commons.lang3.StringUtils.defaultString; -import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.commons.lang3.StringUtils.isNoneBlank; -import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.commons.lang3.StringUtils.*; public abstract class BaseTermReadSvcImpl implements ITermReadSvc, ApplicationContextAware { public static final int DEFAULT_FETCH_SIZE = 250; @@ -222,23 +206,6 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc, ApplicationCo return retVal; } - @PostConstruct - public void buildTranslationCaches() { - Long timeout = myDaoConfig.getTranslationCachesExpireAfterWriteInMinutes(); - - myTranslationCache = - Caffeine.newBuilder() - .maximumSize(10000) - .expireAfterWrite(timeout, TimeUnit.MINUTES) - .build(); - - myTranslationWithReverseCache = - Caffeine.newBuilder() - .maximumSize(10000) - .expireAfterWrite(timeout, TimeUnit.MINUTES) - .build(); - } - /** * This method is present only for unit tests, do not call from client code */ @@ -1316,16 +1283,44 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc, ApplicationCo RuleBasedTransactionAttribute rules = new RuleBasedTransactionAttribute(); rules.getRollbackRules().add(new NoRollbackRuleAttribute(ExpansionTooCostlyException.class)); myTxTemplate = new TransactionTemplate(myTransactionManager, rules); + buildTranslationCaches(); + scheduleJob(); } - @PostConstruct - public void registerScheduledJob() { + private void buildTranslationCaches() { + Long timeout = myDaoConfig.getTranslationCachesExpireAfterWriteInMinutes(); + + myTranslationCache = + Caffeine.newBuilder() + .maximumSize(10000) + .expireAfterWrite(timeout, TimeUnit.MINUTES) + .build(); + + myTranslationWithReverseCache = + Caffeine.newBuilder() + .maximumSize(10000) + .expireAfterWrite(timeout, TimeUnit.MINUTES) + .build(); + } + + public void scheduleJob() { + // TODO KHS what does this mean? // Register scheduled job to pre-expand ValueSets // In the future it would be great to make this a cluster-aware task somehow ScheduledJobDefinition vsJobDefinition = new ScheduledJobDefinition(); - vsJobDefinition.setId(BaseTermReadSvcImpl.class.getName() + "_preExpandValueSets"); - vsJobDefinition.setJobClass(PreExpandValueSetsJob.class); - mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_MINUTE, true, vsJobDefinition); + vsJobDefinition.setId(getClass().getName()); + vsJobDefinition.setJobClass(Job.class); + mySchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_MINUTE, vsJobDefinition); + } + + public static class Job implements HapiJob { + @Autowired + private ITermReadSvc myTerminologySvc; + + @Override + public void execute(JobExecutionContext theContext) { + myTerminologySvc.preExpandDeferredValueSetsToTerminologyTables(); + } } @Override @@ -1881,17 +1876,6 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc, ApplicationCo .findFirst(); } - public static class PreExpandValueSetsJob implements Job { - - @Autowired - private ITermReadSvc myTerminologySvc; - - @Override - public void execute(JobExecutionContext theContext) { - myTerminologySvc.preExpandDeferredValueSetsToTerminologyTables(); - } - } - static List toPersistedConcepts(List theConcept, TermCodeSystemVersion theCodeSystemVersion) { ArrayList retVal = new ArrayList<>(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java index 247b3acfe9c..f0a70b2e045 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java @@ -25,7 +25,7 @@ import ca.uhn.fhir.jpa.dao.data.ITermConceptDao; import ca.uhn.fhir.jpa.dao.data.ITermConceptParentChildLinkDao; import ca.uhn.fhir.jpa.entity.TermConcept; import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink; -import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob; +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc; @@ -53,7 +53,6 @@ import java.util.List; public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc { - private static final int SCHEDULE_INTERVAL_MILLIS = 5000; private static final Logger ourLog = LoggerFactory.getLogger(TermDeferredStorageSvcImpl.class); @Autowired protected ITermConceptDao myConceptDao; @@ -260,13 +259,24 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc { } @PostConstruct - public void registerScheduledJob() { + public void scheduleJob() { + // TODO KHS what does this mean? // Register scheduled job to save deferred concepts // In the future it would be great to make this a cluster-aware task somehow ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); - jobDefinition.setId(BaseTermReadSvcImpl.class.getName() + "_saveDeferred"); - jobDefinition.setJobClass(SaveDeferredJob.class); - mySchedulerService.scheduleFixedDelay(SCHEDULE_INTERVAL_MILLIS, false, jobDefinition); + jobDefinition.setId(this.getClass().getName()); + jobDefinition.setJobClass(Job.class); + mySchedulerService.scheduleLocalJob(5000, jobDefinition); + } + + public static class Job implements HapiJob { + @Autowired + private ITermDeferredStorageSvc myTerminologySvc; + + @Override + public void execute(JobExecutionContext theContext) { + myTerminologySvc.saveDeferred(); + } } @VisibleForTesting @@ -288,23 +298,4 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc { void setConceptDaoForUnitTest(ITermConceptDao theConceptDao) { myConceptDao = theConceptDao; } - - public static class SaveDeferredJob extends FireAtIntervalJob { - - @Autowired - private ITermDeferredStorageSvc myTerminologySvc; - - /** - * Constructor - */ - public SaveDeferredJob() { - super(SCHEDULE_INTERVAL_MILLIS); - } - - @Override - protected void doExecute(JobExecutionContext theContext) { - myTerminologySvc.saveDeferred(); - } - } - } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java index cbc7b9b4e92..9733c0cfc0b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java @@ -23,7 +23,7 @@ package ca.uhn.fhir.jpa.term; import ca.uhn.fhir.jpa.dao.data.ITermConceptDao; import ca.uhn.fhir.jpa.dao.data.ITermConceptParentChildLinkDao; import ca.uhn.fhir.jpa.entity.TermConcept; -import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob; +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc; @@ -54,7 +54,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank; public class TermReindexingSvcImpl implements ITermReindexingSvc { private static final Logger ourLog = LoggerFactory.getLogger(TermReindexingSvcImpl.class); - private static final long SCHEDULE_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE; private static boolean ourForceSaveDeferredAlwaysForUnitTest; @Autowired protected ITermConceptDao myConceptDao; @@ -150,29 +149,22 @@ public class TermReindexingSvcImpl implements ITermReindexingSvc { } @PostConstruct - public void registerScheduledJob() { + public void scheduleJob() { + // TODO KHS what does this mean? // Register scheduled job to save deferred concepts // In the future it would be great to make this a cluster-aware task somehow ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); - jobDefinition.setId(TermReindexingSvcImpl.class.getName() + "_reindex"); - jobDefinition.setJobClass(SaveDeferredJob.class); - mySchedulerService.scheduleFixedDelay(SCHEDULE_INTERVAL_MILLIS, false, jobDefinition); + jobDefinition.setId(this.getClass().getName()); + jobDefinition.setJobClass(Job.class); + mySchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_MINUTE, jobDefinition); } - public static class SaveDeferredJob extends FireAtIntervalJob { - + public static class Job implements HapiJob { @Autowired private ITermDeferredStorageSvc myTerminologySvc; - /** - * Constructor - */ - public SaveDeferredJob() { - super(SCHEDULE_INTERVAL_MILLIS); - } - @Override - protected void doExecute(JobExecutionContext theContext) { + public void execute(JobExecutionContext theContext) { myTerminologySvc.saveDeferred(); } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/ResourceCountCache.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/ResourceCountCache.java index 843736034fd..c5cf0d82294 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/ResourceCountCache.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/ResourceCountCache.java @@ -20,12 +20,12 @@ package ca.uhn.fhir.jpa.util; * #L% */ +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.time.DateUtils; -import org.quartz.Job; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,14 +92,14 @@ public class ResourceCountCache { } @PostConstruct - public void registerScheduledJob() { + public void scheduleJob() { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(ResourceCountCache.class.getName()); - jobDetail.setJobClass(ResourceCountCache.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_MINUTE, false, jobDetail); + jobDetail.setId(getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleLocalJob(10 * DateUtils.MILLIS_PER_MINUTE, jobDetail); } - public static class SubmitJob implements Job { + public static class Job implements HapiJob { @Autowired private ResourceCountCache myTarget; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImplTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImplTest.java index 8409ea0c049..de008af9646 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImplTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/sched/SchedulerServiceImplTest.java @@ -1,16 +1,18 @@ package ca.uhn.fhir.jpa.sched; -import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob; +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.quartz.*; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.BeanUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -18,7 +20,6 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.data.util.ProxyUtils; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.util.AopTestUtils; @@ -50,7 +51,7 @@ public class SchedulerServiceImplTest { .setId(CountingJob.class.getName()) .setJobClass(CountingJob.class); - mySvc.scheduleFixedDelay(100, false, def); + mySvc.scheduleLocalJob(100, def); sleepAtLeast(1000); @@ -67,12 +68,12 @@ public class SchedulerServiceImplTest { .setId(CountingJob.class.getName()) .setJobClass(CountingJob.class); - SchedulerServiceImpl svc = AopTestUtils.getTargetObject(mySvc); + BaseSchedulerServiceImpl svc = AopTestUtils.getTargetObject(mySvc); svc.stop(); + svc.create(); svc.start(); - svc.contextStarted(null); - mySvc.scheduleFixedDelay(100, false, def); + mySvc.scheduleLocalJob(100, def); sleepAtLeast(1000); @@ -90,7 +91,7 @@ public class SchedulerServiceImplTest { .setJobClass(CountingJob.class); ourTaskDelay = 500; - mySvc.scheduleFixedDelay(100, false, def); + mySvc.scheduleLocalJob(100, def); sleepAtLeast(1000); @@ -108,7 +109,7 @@ public class SchedulerServiceImplTest { .setJobClass(CountingIntervalJob.class); ourTaskDelay = 500; - mySvc.scheduleFixedDelay(100, false, def); + mySvc.scheduleLocalJob(100, def); sleepAtLeast(2000); @@ -159,10 +160,7 @@ public class SchedulerServiceImplTest { } } - - @DisallowConcurrentExecution - @PersistJobDataAfterExecution - public static class CountingIntervalJob extends FireAtIntervalJob { + public static class CountingIntervalJob implements HapiJob { private static int ourCount; @@ -171,26 +169,20 @@ public class SchedulerServiceImplTest { private String myStringBean; private ApplicationContext myAppCtx; - public CountingIntervalJob() { - super(500); - } - @Override - public void doExecute(JobExecutionContext theContext) { + public void execute(JobExecutionContext theContext) { ourLog.info("Job has fired, going to sleep for {}ms", ourTaskDelay); sleepAtLeast(ourTaskDelay); ourCount++; } - } - @Configuration public static class TestConfiguration { @Bean public ISchedulerService schedulerService() { - return new SchedulerServiceImpl(); + return new HapiSchedulerServiceImpl(); } @Bean diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImplTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImplTest.java index 1a3e3fd7efe..67e050ebcaa 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImplTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImplTest.java @@ -19,7 +19,6 @@ import ca.uhn.fhir.rest.param.StringParam; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.util.TestUtil; -import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBaseResource; import org.junit.After; import org.junit.AfterClass; @@ -157,6 +156,7 @@ public class SearchCoordinatorSvcImplTest { } + // TODO INTERMITTENT this test fails intermittently @Test public void testAsyncSearchLargeResultSetBigCountSameCoordinator() { List allResults = new ArrayList<>(); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/reindex/ResourceReindexingSvcImplTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/reindex/ResourceReindexingSvcImplTest.java index df545aa87fe..626957b7a3d 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/reindex/ResourceReindexingSvcImplTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/reindex/ResourceReindexingSvcImplTest.java @@ -1,16 +1,13 @@ package ca.uhn.fhir.jpa.search.reindex; import ca.uhn.fhir.context.FhirContext; -import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; -import ca.uhn.fhir.jpa.dao.BaseJpaTest; -import ca.uhn.fhir.jpa.dao.DaoConfig; -import ca.uhn.fhir.jpa.dao.DaoRegistry; -import ca.uhn.fhir.jpa.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.dao.*; import ca.uhn.fhir.jpa.dao.data.IForcedIdDao; import ca.uhn.fhir.jpa.dao.data.IResourceReindexJobDao; import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity; import ca.uhn.fhir.jpa.model.entity.ResourceTable; +import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry; import org.apache.commons.lang3.time.DateUtils; import org.hl7.fhir.instance.model.api.IBaseResource; @@ -72,6 +69,8 @@ public class ResourceReindexingSvcImplTest extends BaseJpaTest { private ISearchParamRegistry mySearchParamRegistry; @Mock private TransactionStatus myTxStatus; + @Mock + private ISchedulerService mySchedulerService; @Override protected FhirContext getContext() { @@ -97,6 +96,7 @@ public class ResourceReindexingSvcImplTest extends BaseJpaTest { mySvc.setResourceTableDaoForUnitTest(myResourceTableDao); mySvc.setTxManagerForUnitTest(myTxManager); mySvc.setSearchParamRegistryForUnitTest(mySearchParamRegistry); + mySvc.setSchedulerServiceForUnitTest(mySchedulerService); mySvc.start(); when(myTxManager.getTransaction(any())).thenReturn(myTxStatus); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionTriggeringDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionTriggeringDstu3Test.java index 1cfc1e5896f..6d1d9169235 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionTriggeringDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionTriggeringDstu3Test.java @@ -3,11 +3,11 @@ package ca.uhn.fhir.jpa.subscription.resthook; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; +import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider; import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test; import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil; import ca.uhn.fhir.jpa.subscription.SubscriptionTriggeringSvcImpl; -import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.rest.annotation.Create; import ca.uhn.fhir.rest.annotation.ResourceParam; import ca.uhn.fhir.rest.annotation.Update; @@ -16,6 +16,7 @@ import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.RestfulServer; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; +import ca.uhn.fhir.test.utilities.JettyUtil; import com.google.common.collect.Lists; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -34,8 +35,6 @@ import java.util.List; import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.*; -import ca.uhn.fhir.test.utilities.JettyUtil; - /** * Test the rest-hook subscriptions */ @@ -101,7 +100,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourUpdatedPatients.clear(); ourContentTypes.clear(); - mySchedulerService.logStatus(); + mySchedulerService.logStatusForUnitTest(); } private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException { diff --git a/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/SchemaInitializationProvider.java b/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/SchemaInitializationProvider.java index 633a26f41e3..30db7c29569 100644 --- a/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/SchemaInitializationProvider.java +++ b/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/SchemaInitializationProvider.java @@ -27,18 +27,21 @@ import com.google.common.base.Charsets; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; +import javax.annotation.Nonnull; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import static org.apache.commons.lang3.StringUtils.isBlank; + public class SchemaInitializationProvider implements ISchemaInitializationProvider { + private final String mySchemaFileClassPath; private final String mySchemaExistsIndicatorTable; /** - * - * @param theSchemaFileClassPath pathname to script used to initialize schema + * @param theSchemaFileClassPath pathname to script used to initialize schema * @param theSchemaExistsIndicatorTable a table name we can use to determine if this schema has already been initialized */ public SchemaInitializationProvider(String theSchemaFileClassPath, String theSchemaExistsIndicatorTable) { @@ -50,18 +53,20 @@ public class SchemaInitializationProvider implements ISchemaInitializationProvid public List getSqlStatements(DriverTypeEnum theDriverType) { List retval = new ArrayList<>(); - String initScript; - initScript = mySchemaFileClassPath + "/" + theDriverType.getSchemaFilename(); + String initScript = mySchemaFileClassPath + "/" + getInitScript(theDriverType); try { InputStream sqlFileInputStream = SchemaInitializationProvider.class.getResourceAsStream(initScript); if (sqlFileInputStream == null) { throw new ConfigurationException("Schema initialization script " + initScript + " not found on classpath"); } // Assumes no escaped semicolons... - String[] statements = IOUtils.toString(sqlFileInputStream, Charsets.UTF_8).split("\\;"); + String sqlString = IOUtils.toString(sqlFileInputStream, Charsets.UTF_8); + String sqlStringNoComments = preProcessSqlString(theDriverType, sqlString); + String[] statements = sqlStringNoComments.split("\\;"); for (String statement : statements) { - if (!statement.trim().isEmpty()) { - retval.add(statement); + String cleanedStatement = preProcessSqlStatement(theDriverType, statement); + if (!isBlank(cleanedStatement)) { + retval.add(cleanedStatement); } } } catch (IOException e) { @@ -70,6 +75,19 @@ public class SchemaInitializationProvider implements ISchemaInitializationProvid return retval; } + protected String preProcessSqlString(DriverTypeEnum theDriverType, String sqlString) { + return sqlString; + } + + protected String preProcessSqlStatement(DriverTypeEnum theDriverType, String sqlStatement) { + return sqlStatement; + } + + @Nonnull + protected String getInitScript(DriverTypeEnum theDriverType) { + return theDriverType.getSchemaFilename(); + } + @Override public boolean equals(Object theO) { if (this == theO) return true; @@ -93,3 +111,4 @@ public class SchemaInitializationProvider implements ISchemaInitializationProvid return mySchemaExistsIndicatorTable; } } + diff --git a/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/api/Builder.java b/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/api/Builder.java index b732c5c8b2e..c8ec7748c99 100644 --- a/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/api/Builder.java +++ b/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/api/Builder.java @@ -57,6 +57,13 @@ public class Builder { return this; } + public Builder initializeSchema(String theVersion, String theSchemaName, ISchemaInitializationProvider theSchemaInitializationProvider) { + InitializeSchemaTask task = new InitializeSchemaTask(myRelease, theVersion, theSchemaInitializationProvider); + task.setDescription("Initialize " + theSchemaName + " schema"); + mySink.addTask(task); + return this; + } + public Builder executeRawSql(String theVersion, DriverTypeEnum theDriver, @Language("SQL") String theSql) { mySink.addTask(new ExecuteRawSqlTask(myRelease, theVersion).addSql(theDriver, theSql)); return this; diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/FireAtIntervalJob.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/FireAtIntervalJob.java deleted file mode 100644 index 52e787c54c6..00000000000 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/FireAtIntervalJob.java +++ /dev/null @@ -1,65 +0,0 @@ -package ca.uhn.fhir.jpa.model.sched; - -/*- - * #%L - * HAPI FHIR Model - * %% - * Copyright (C) 2014 - 2019 University Health Network - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.quartz.DisallowConcurrentExecution; -import org.quartz.Job; -import org.quartz.JobExecutionContext; -import org.quartz.PersistJobDataAfterExecution; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@DisallowConcurrentExecution -@PersistJobDataAfterExecution -public abstract class FireAtIntervalJob implements Job { - - public static final String NEXT_EXECUTION_TIME = "NEXT_EXECUTION_TIME"; - private static final Logger ourLog = LoggerFactory.getLogger(FireAtIntervalJob.class); - private final long myMillisBetweenExecutions; - - public FireAtIntervalJob(long theMillisBetweenExecutions) { - myMillisBetweenExecutions = theMillisBetweenExecutions; - } - - @Override - public final void execute(JobExecutionContext theContext) { - Long nextExecution = (Long) theContext.getJobDetail().getJobDataMap().get(NEXT_EXECUTION_TIME); - - if (nextExecution != null) { - long cutoff = System.currentTimeMillis(); - if (nextExecution >= cutoff) { - return; - } - } - - try { - doExecute(theContext); - } catch (Throwable t) { - ourLog.error("Job threw uncaught exception", t); - } finally { - long newNextExecution = System.currentTimeMillis() + myMillisBetweenExecutions; - theContext.getJobDetail().getJobDataMap().put(NEXT_EXECUTION_TIME, newNextExecution); - } - } - - protected abstract void doExecute(JobExecutionContext theContext); - -} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/QuartzTableSeeder.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/HapiJob.java similarity index 62% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/QuartzTableSeeder.java rename to hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/HapiJob.java index 49ee1dec51f..a901b648eb3 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/QuartzTableSeeder.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/HapiJob.java @@ -1,8 +1,8 @@ -package ca.uhn.fhir.jpa.sched; +package ca.uhn.fhir.jpa.model.sched; /*- * #%L - * HAPI FHIR JPA Server + * HAPI FHIR Model * %% * Copyright (C) 2014 - 2019 University Health Network * %% @@ -20,19 +20,9 @@ package ca.uhn.fhir.jpa.sched; * #L% */ -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; - -import javax.annotation.PostConstruct; - -public class QuartzTableSeeder { - - @Autowired - private LocalContainerEntityManagerFactoryBean myEntityManagerFactory; - - @PostConstruct - public void start() { - - } +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +@DisallowConcurrentExecution +public interface HapiJob extends Job { } diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/IHapiScheduler.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/IHapiScheduler.java new file mode 100644 index 00000000000..4ce9351369d --- /dev/null +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/IHapiScheduler.java @@ -0,0 +1,24 @@ +package ca.uhn.fhir.jpa.model.sched; + +import org.quartz.JobKey; +import org.quartz.SchedulerException; + +import java.util.Set; + +public interface IHapiScheduler { + void init() throws SchedulerException; + + void start(); + + void shutdown(); + + boolean isStarted(); + + void clear() throws SchedulerException; + + void logStatusForUnitTest(); + + void scheduleJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition); + + Set getJobKeysForUnitTest() throws SchedulerException; +} diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISchedulerService.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISchedulerService.java index 599b12a9ead..656305483f3 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISchedulerService.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISchedulerService.java @@ -21,21 +21,37 @@ package ca.uhn.fhir.jpa.model.sched; */ import com.google.common.annotations.VisibleForTesting; +import org.quartz.JobKey; import org.quartz.SchedulerException; +import java.util.Set; + public interface ISchedulerService { @VisibleForTesting void purgeAllScheduledJobsForUnitTest() throws SchedulerException; - void logStatus(); + void logStatusForUnitTest(); /** + * Only one instance of this task will fire across the whole cluster (when running in a clustered environment). * @param theIntervalMillis How many milliseconds between passes should this job run - * @param theClusteredTask If true, only one instance of this task will fire across the whole cluster (when running in a clustered environment). If false, or if not running in a clustered environment, this task will execute locally (and should execute on all nodes of the cluster) * @param theJobDefinition The Job to fire */ - void scheduleFixedDelay(long theIntervalMillis, boolean theClusteredTask, ScheduledJobDefinition theJobDefinition); + void scheduleLocalJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition); + + /** + * This task will execute locally (and should execute on all nodes of the cluster if there is a cluster) + * @param theIntervalMillis How many milliseconds between passes should this job run + * @param theJobDefinition The Job to fire + */ + void scheduleClusteredJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition); + + @VisibleForTesting + Set getLocalJobKeysForUnitTest() throws SchedulerException; + + @VisibleForTesting + Set getClusteredJobKeysForUnitTest() throws SchedulerException; boolean isStopping(); } diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISmartLifecyclePhase.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISmartLifecyclePhase.java new file mode 100644 index 00000000000..51b58964c10 --- /dev/null +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISmartLifecyclePhase.java @@ -0,0 +1,10 @@ +package ca.uhn.fhir.jpa.model.sched; + +public interface ISmartLifecyclePhase { + // POST_CONSTRUCT is here as a marker for where post-construct fits into the smart lifecycle. Beans with negative phases + // will be started before @PostConstruct are called + int POST_CONSTRUCT = 0; + + // We want to start scheduled tasks fairly late in the startup process + int SCHEDULER_1000 = 1000; +} diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ScheduledJobDefinition.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ScheduledJobDefinition.java index caa28b0ab32..245dec8721b 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ScheduledJobDefinition.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ScheduledJobDefinition.java @@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.model.sched; */ import org.apache.commons.lang3.Validate; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.quartz.Job; import java.util.Collections; @@ -28,10 +29,9 @@ import java.util.HashMap; import java.util.Map; public class ScheduledJobDefinition { - - private Class myJobClass; private String myId; + private String myGroup; private Map myJobData; public Map getJobData() { @@ -60,6 +60,15 @@ public class ScheduledJobDefinition { return this; } + public String getGroup() { + return myGroup; + } + + public ScheduledJobDefinition setGroup(String theGroup) { + myGroup = theGroup; + return this; + } + public void addJobData(String thePropertyName, String thePropertyValue) { Validate.notBlank(thePropertyName); if (myJobData == null) { @@ -68,4 +77,13 @@ public class ScheduledJobDefinition { Validate.isTrue(myJobData.containsKey(thePropertyName) == false); myJobData.put(thePropertyName, thePropertyValue); } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("myJobClass", myJobClass) + .append("myId", myId) + .append("myGroup", myGroup) + .toString(); + } } diff --git a/hapi-fhir-jpaserver-model/src/test/java/ca/uhn/fhir/jpa/model/sched/FireAtIntervalJobTest.java b/hapi-fhir-jpaserver-model/src/test/java/ca/uhn/fhir/jpa/model/sched/FireAtIntervalJobTest.java deleted file mode 100644 index 6fe7e79c27e..00000000000 --- a/hapi-fhir-jpaserver-model/src/test/java/ca/uhn/fhir/jpa/model/sched/FireAtIntervalJobTest.java +++ /dev/null @@ -1,40 +0,0 @@ -package ca.uhn.fhir.jpa.model.sched; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Answers; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; -import org.quartz.JobExecutionContext; - -import static ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob.NEXT_EXECUTION_TIME; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -@RunWith(MockitoJUnitRunner.class) -public class FireAtIntervalJobTest { - - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private JobExecutionContext myJobExecutionContext; - - @Test - public void testExecutionThrowsException() { - - FireAtIntervalJob job = new FireAtIntervalJob(1000) { - @Override - protected void doExecute(JobExecutionContext theContext) { - throw new NullPointerException(); - } - }; - - // No exception thrown please - job.execute(myJobExecutionContext); - - verify(myJobExecutionContext.getJobDetail().getJobDataMap(), times(1)).put(eq(NEXT_EXECUTION_TIME), anyLong()); - - } - - -} diff --git a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImpl.java b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImpl.java index d9875583e24..78f55680b37 100644 --- a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImpl.java +++ b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImpl.java @@ -27,6 +27,7 @@ import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.model.entity.ModelConfig; +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.search.StorageProcessingMessage; @@ -47,27 +48,15 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; import org.hl7.fhir.dstu3.model.Extension; import org.hl7.fhir.dstu3.model.SearchParameter; -import org.hl7.fhir.instance.model.api.IBaseExtension; -import org.hl7.fhir.instance.model.api.IBaseHasExtensions; -import org.hl7.fhir.instance.model.api.IBaseResource; -import org.hl7.fhir.instance.model.api.IIdType; -import org.hl7.fhir.instance.model.api.IPrimitiveType; +import org.hl7.fhir.instance.model.api.*; import org.hl7.fhir.r4.model.Reference; -import org.quartz.Job; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import javax.annotation.PostConstruct; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import static org.apache.commons.lang3.StringUtils.isBlank; @@ -735,11 +724,21 @@ public class SearchParamRegistryImpl implements ISearchParamRegistry { } @PostConstruct - public void registerScheduledJob() { + public void scheduleJob() { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(SearchParamRegistryImpl.class.getName()); - jobDetail.setJobClass(SubmitJob.class); - mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_SECOND, false, jobDetail); + jobDetail.setId(getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleLocalJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); + } + + public static class Job implements HapiJob { + @Autowired + private ISearchParamRegistry myTarget; + + @Override + public void execute(JobExecutionContext theContext) { + myTarget.refreshCacheIfNecessary(); + } } @Override @@ -778,17 +777,6 @@ public class SearchParamRegistryImpl implements ISearchParamRegistry { } } - - public static class SubmitJob implements Job { - @Autowired - private ISearchParamRegistry myTarget; - - @Override - public void execute(JobExecutionContext theContext) { - myTarget.refreshCacheIfNecessary(); - } - } - public static Map> createBuiltInSearchParamMap(FhirContext theFhirContext) { Map> resourceNameToSearchParams = new HashMap<>(); diff --git a/hapi-fhir-jpaserver-searchparam/src/test/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImplTest.java b/hapi-fhir-jpaserver-searchparam/src/test/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImplTest.java index 4cb71863129..c9b8f817b50 100644 --- a/hapi-fhir-jpaserver-searchparam/src/test/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImplTest.java +++ b/hapi-fhir-jpaserver-searchparam/src/test/java/ca/uhn/fhir/jpa/searchparam/registry/SearchParamRegistryImplTest.java @@ -6,8 +6,6 @@ import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.rest.server.SimpleBundleProvider; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import org.hamcrest.Matchers; -import org.hl7.fhir.instance.model.api.IBaseDatatype; -import org.hl7.fhir.instance.model.api.IBaseExtension; import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.r4.model.Enumerations; import org.hl7.fhir.r4.model.SearchParameter; @@ -21,7 +19,8 @@ import org.mockito.junit.MockitoJUnitRunner; import java.util.Map; import static org.junit.Assert.*; -import static org.mockito.ArgumentMatchers.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java index 1f5e4ae998b..a575df4bb3d 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java @@ -21,7 +21,7 @@ package ca.uhn.fhir.jpa.subscription.module.cache; */ import ca.uhn.fhir.jpa.api.IDaoRegistry; -import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob; +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; @@ -33,9 +33,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.time.DateUtils; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.r4.model.Subscription; -import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; -import org.quartz.PersistJobDataAfterExecution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -52,7 +50,6 @@ import java.util.concurrent.Semaphore; @Service @Lazy public class SubscriptionLoader { - public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE; private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class); private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes private final Object mySyncSubscriptionsLock = new Object(); @@ -90,11 +87,21 @@ public class SubscriptionLoader { @PostConstruct - public void registerScheduledJob() { + public void scheduleJob() { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(SubscriptionLoader.class.getName()); - jobDetail.setJobClass(SubscriptionLoader.SubmitJob.class); - mySchedulerService.scheduleFixedDelay(REFRESH_INTERVAL, false, jobDetail); + jobDetail.setId(getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_MINUTE, jobDetail); + } + + public static class Job implements HapiJob { + @Autowired + private SubscriptionLoader myTarget; + + @Override + public void execute(JobExecutionContext theContext) { + myTarget.syncSubscriptions(); + } } @VisibleForTesting @@ -159,21 +166,5 @@ public class SubscriptionLoader { public void setSubscriptionProviderForUnitTest(ISubscriptionProvider theSubscriptionProvider) { mySubscriptionProvider = theSubscriptionProvider; } - - @DisallowConcurrentExecution - @PersistJobDataAfterExecution - public static class SubmitJob extends FireAtIntervalJob { - @Autowired - private SubscriptionLoader myTarget; - - public SubmitJob() { - super(REFRESH_INTERVAL); - } - - @Override - protected void doExecute(JobExecutionContext theContext) { - myTarget.syncSubscriptions(); - } - } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/config/TestSubscriptionDstu3Config.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/config/TestSubscriptionDstu3Config.java index a4238230f12..bd833138922 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/config/TestSubscriptionDstu3Config.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/config/TestSubscriptionDstu3Config.java @@ -4,7 +4,10 @@ import ca.uhn.fhir.jpa.api.IDaoRegistry; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider; import ca.uhn.fhir.jpa.subscription.module.cache.ISubscriptionProvider; -import org.springframework.context.annotation.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Primary; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/interceptor/AnalyticsInterceptor.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/interceptor/AnalyticsInterceptor.java index b21b6b27050..ddc5687d799 100644 --- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/interceptor/AnalyticsInterceptor.java +++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/interceptor/AnalyticsInterceptor.java @@ -1,5 +1,6 @@ package ca.uhn.fhirtest.interceptor; +import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.rest.api.RestOperationTypeEnum; @@ -13,7 +14,6 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; -import org.quartz.Job; import org.quartz.JobExecutionContext; import org.springframework.beans.factory.annotation.Autowired; @@ -55,7 +55,15 @@ public class AnalyticsInterceptor extends InterceptorAdapter { } } - public static class SubmitJob implements Job { + @PostConstruct + public void start() { + ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); + jobDetail.setId(getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleLocalJob(5000, jobDetail); + } + + public static class Job implements HapiJob { @Autowired private AnalyticsInterceptor myAnalyticsInterceptor; @@ -65,14 +73,6 @@ public class AnalyticsInterceptor extends InterceptorAdapter { } } - @PostConstruct - public void start() { - ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(getClass().getName()); - jobDetail.setJobClass(SubmitJob.class); - mySchedulerService.scheduleFixedDelay(5000, false, jobDetail); - } - @PreDestroy public void stop() throws IOException { if (myHttpClient instanceof CloseableHttpClient) {