diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java index ae6c71ed9b3..079716f3349 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImpl.java @@ -78,7 +78,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; public class BulkDataExportSvcImpl implements IBulkDataExportSvc { - private static final long REFRESH_INTERVAL = 10 * DateUtils.MILLIS_PER_SECOND; + private static final long JOB_INTERVAL_MILLIS = 10 * DateUtils.MILLIS_PER_SECOND; private static final Logger ourLog = LoggerFactory.getLogger(BulkDataExportSvcImpl.class); private int myReuseBulkExportForMillis = (int) (60 * DateUtils.MILLIS_PER_MINUTE); @@ -311,13 +311,23 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { @PostConstruct public void start() { - ourLog.info("Bulk export service starting with refresh interval {}", StopWatch.formatMillis(REFRESH_INTERVAL)); + ourLog.info("Bulk export service starting with refresh interval {}", StopWatch.formatMillis(JOB_INTERVAL_MILLIS)); myTxTemplate = new TransactionTemplate(myTxManager); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(BulkDataExportSvcImpl.class.getName()); - jobDetail.setJobClass(BulkDataExportSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelayClustered(REFRESH_INTERVAL, jobDetail); + jobDetail.setId(this.getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleFixedDelayClustered(JOB_INTERVAL_MILLIS, jobDetail); + } + + public static class Job implements HapiJob { + @Autowired + private IBulkDataExportSvc myTarget; + + @Override + public void execute(JobExecutionContext theContext) { + myTarget.buildExportFiles(); + } } @Transactional @@ -466,16 +476,4 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { return null; }); } - - public static class SubmitJob implements HapiJob { - @Autowired - private IBulkDataExportSvc myTarget; - - @Override - public void execute(JobExecutionContext theContext) { - myTarget.buildExportFiles(); - } - } - - } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java index 18a73dd8090..87f2f3e0ac3 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java @@ -45,7 +45,7 @@ import java.util.*; @Component public class CacheWarmingSvcImpl implements ICacheWarmingSvc { - public static final long SCHEDULED_JOB_INTERVAL = 10 * DateUtils.MILLIS_PER_SECOND; + public static final long JOB_INTERVAL_MILLIS = 10 * DateUtils.MILLIS_PER_SECOND; private static final Logger ourLog = LoggerFactory.getLogger(CacheWarmingSvcImpl.class); @Autowired private DaoConfig myDaoConfig; @@ -81,14 +81,6 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc { } - @PostConstruct - public void registerScheduledJob() { - ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(CacheWarmingSvcImpl.class.getName()); - jobDetail.setJobClass(CacheWarmingSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelayClustered(SCHEDULED_JOB_INTERVAL, jobDetail); - } - private void refreshNow(WarmCacheEntry theCacheEntry) { String nextUrl = theCacheEntry.getUrl(); @@ -111,6 +103,24 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc { @PostConstruct public void start() { initCacheMap(); + scheduleJob(); + } + + public void scheduleJob() { + ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); + jobDetail.setId(this.getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleFixedDelayClustered(JOB_INTERVAL_MILLIS, jobDetail); + } + + public static class Job implements HapiJob { + @Autowired + private ICacheWarmingSvc myTarget; + + @Override + public void execute(JobExecutionContext theContext) { + myTarget.performWarmingPass(); + } } public synchronized Set initCacheMap() { @@ -127,16 +137,5 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc { } return Collections.unmodifiableSet(myCacheEntryToNextRefresh.keySet()); - - } - - public static class SubmitJob implements HapiJob { - @Autowired - private ICacheWarmingSvc myTarget; - - @Override - public void execute(JobExecutionContext theContext) { - myTarget.performWarmingPass(); - } } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java index 10743b1eb5b..4e9fac5efc6 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java @@ -75,7 +75,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; @Service public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc { - public static final long SCHEDULE_DELAY = DateUtils.MILLIS_PER_SECOND; + public static final long JOB_INTERVAL_MILLIS = DateUtils.MILLIS_PER_SECOND; private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class); private static final int DEFAULT_MAX_SUBMIT = 10000; private final List myActiveJobs = new ArrayList<>(); @@ -157,14 +157,6 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc return retVal; } - @PostConstruct - public void registerScheduledJob() { - ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(SubscriptionTriggeringSvcImpl.class.getName()); - jobDetail.setJobClass(SubscriptionTriggeringSvcImpl.SubmitJob.class); - mySchedulerService.scheduleFixedDelayLocal(SCHEDULE_DELAY, jobDetail); - } - @Override public void runDeliveryPass() { @@ -354,6 +346,11 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc @PostConstruct public void start() { + createExecutorService(); + scheduleJob(); + } + + private void createExecutorService() { LinkedBlockingQueue executorQueue = new LinkedBlockingQueue<>(1000); BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() .namingPattern("SubscriptionTriggering-%d") @@ -384,10 +381,16 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc executorQueue, threadFactory, rejectedExecutionHandler); - } - public static class SubmitJob implements HapiJob { + private void scheduleJob() { + ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); + jobDetail.setId(this.getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleFixedDelayLocal(JOB_INTERVAL_MILLIS, jobDetail); + } + + public static class Job implements HapiJob { @Autowired private ISubscriptionTriggeringSvc myTarget; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java index 1092c418b6a..a85ccd1f0e9 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java @@ -53,7 +53,7 @@ import java.util.List; public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc { - private static final int SCHEDULE_INTERVAL_MILLIS = 5000; + private static final int JOB_INTERVAL_MILLIS = 5000; private static final Logger ourLog = LoggerFactory.getLogger(TermDeferredStorageSvcImpl.class); @Autowired protected ITermConceptDao myConceptDao; @@ -260,13 +260,24 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc { } @PostConstruct - public void registerScheduledJob() { + public void scheduleJob() { + // FIXME KHS what does this mean? // Register scheduled job to save deferred concepts // In the future it would be great to make this a cluster-aware task somehow ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); - jobDefinition.setId(BaseTermReadSvcImpl.class.getName() + "_saveDeferred"); - jobDefinition.setJobClass(SaveDeferredJob.class); - mySchedulerService.scheduleFixedDelayLocal(SCHEDULE_INTERVAL_MILLIS, jobDefinition); + jobDefinition.setId(this.getClass().getName()); + jobDefinition.setJobClass(Job.class); + mySchedulerService.scheduleFixedDelayLocal(JOB_INTERVAL_MILLIS, jobDefinition); + } + + public static class Job implements HapiJob { + @Autowired + private ITermDeferredStorageSvc myTerminologySvc; + + @Override + public void execute(JobExecutionContext theContext) { + myTerminologySvc.saveDeferred(); + } } @VisibleForTesting @@ -288,15 +299,4 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc { void setConceptDaoForUnitTest(ITermConceptDao theConceptDao) { myConceptDao = theConceptDao; } - - public static class SaveDeferredJob implements HapiJob { - @Autowired - private ITermDeferredStorageSvc myTerminologySvc; - - @Override - public void execute(JobExecutionContext theContext) { - myTerminologySvc.saveDeferred(); - } - } - } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java index b206fcbc1a0..bb4af70c671 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermReindexingSvcImpl.java @@ -54,7 +54,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; public class TermReindexingSvcImpl implements ITermReindexingSvc { private static final Logger ourLog = LoggerFactory.getLogger(TermReindexingSvcImpl.class); - private static final long SCHEDULE_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE; + private static final long JOB_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE; private static boolean ourForceSaveDeferredAlwaysForUnitTest; @Autowired protected ITermConceptDao myConceptDao; @@ -150,16 +150,17 @@ public class TermReindexingSvcImpl implements ITermReindexingSvc { } @PostConstruct - public void registerScheduledJob() { + public void scheduleJob() { + // FIXME KHS what does this mean? // Register scheduled job to save deferred concepts // In the future it would be great to make this a cluster-aware task somehow ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); - jobDefinition.setId(TermReindexingSvcImpl.class.getName() + "_reindex"); - jobDefinition.setJobClass(SaveDeferredJob.class); - mySchedulerService.scheduleFixedDelayLocal(SCHEDULE_INTERVAL_MILLIS, jobDefinition); + jobDefinition.setId(this.getClass().getName()); + jobDefinition.setJobClass(Job.class); + mySchedulerService.scheduleFixedDelayLocal(JOB_INTERVAL_MILLIS, jobDefinition); } - public static class SaveDeferredJob implements HapiJob { + public static class Job implements HapiJob { @Autowired private ITermDeferredStorageSvc myTerminologySvc; diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/HapiJob.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/HapiJob.java index 6cf22a2a5c1..bd51a3646aa 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/HapiJob.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/HapiJob.java @@ -22,9 +22,9 @@ package ca.uhn.fhir.jpa.model.sched; import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; -import org.quartz.PersistJobDataAfterExecution; @DisallowConcurrentExecution -@PersistJobDataAfterExecution +// FIXME KHS need this? +// @PersistJobDataAfterExecution public interface HapiJob extends Job { } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java index d49c490dc16..6322ec2669a 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java @@ -50,7 +50,7 @@ import java.util.concurrent.Semaphore; @Service @Lazy public class SubscriptionLoader { - public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE; + public static final long JOB_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE; private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class); private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes private final Object mySyncSubscriptionsLock = new Object(); @@ -88,11 +88,21 @@ public class SubscriptionLoader { @PostConstruct - public void registerScheduledJob() { + public void scheduleJob() { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); - jobDetail.setId(SubscriptionLoader.class.getName()); - jobDetail.setJobClass(SubscriptionLoader.SubmitJob.class); - mySchedulerService.scheduleFixedDelayLocal(REFRESH_INTERVAL, jobDetail); + jobDetail.setId(this.getClass().getName()); + jobDetail.setJobClass(Job.class); + mySchedulerService.scheduleFixedDelayLocal(JOB_INTERVAL_MILLIS, jobDetail); + } + + public static class Job implements HapiJob { + @Autowired + private SubscriptionLoader myTarget; + + @Override + public void execute(JobExecutionContext theContext) { + myTarget.syncSubscriptions(); + } } @VisibleForTesting @@ -157,15 +167,5 @@ public class SubscriptionLoader { public void setSubscriptionProviderForUnitTest(ISubscriptionProvider theSubscriptionProvider) { mySubscriptionProvider = theSubscriptionProvider; } - - public static class SubmitJob implements HapiJob { - @Autowired - private SubscriptionLoader myTarget; - - @Override - public void execute(JobExecutionContext theContext) { - myTarget.syncSubscriptions(); - } - } }