standardized job names and how they're scheduled

This commit is contained in:
Ken Stevens 2019-11-29 10:12:13 -05:00
parent a0e363171d
commit 321790f6bf
7 changed files with 88 additions and 87 deletions

View File

@ -78,7 +78,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class BulkDataExportSvcImpl implements IBulkDataExportSvc { public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
private static final long REFRESH_INTERVAL = 10 * DateUtils.MILLIS_PER_SECOND; private static final long JOB_INTERVAL_MILLIS = 10 * DateUtils.MILLIS_PER_SECOND;
private static final Logger ourLog = LoggerFactory.getLogger(BulkDataExportSvcImpl.class); private static final Logger ourLog = LoggerFactory.getLogger(BulkDataExportSvcImpl.class);
private int myReuseBulkExportForMillis = (int) (60 * DateUtils.MILLIS_PER_MINUTE); private int myReuseBulkExportForMillis = (int) (60 * DateUtils.MILLIS_PER_MINUTE);
@ -311,13 +311,23 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@PostConstruct @PostConstruct
public void start() { public void start() {
ourLog.info("Bulk export service starting with refresh interval {}", StopWatch.formatMillis(REFRESH_INTERVAL)); ourLog.info("Bulk export service starting with refresh interval {}", StopWatch.formatMillis(JOB_INTERVAL_MILLIS));
myTxTemplate = new TransactionTemplate(myTxManager); myTxTemplate = new TransactionTemplate(myTxManager);
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(BulkDataExportSvcImpl.class.getName()); jobDetail.setId(this.getClass().getName());
jobDetail.setJobClass(BulkDataExportSvcImpl.SubmitJob.class); jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleFixedDelayClustered(REFRESH_INTERVAL, jobDetail); mySchedulerService.scheduleFixedDelayClustered(JOB_INTERVAL_MILLIS, jobDetail);
}
public static class Job implements HapiJob {
@Autowired
private IBulkDataExportSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.buildExportFiles();
}
} }
@Transactional @Transactional
@ -466,16 +476,4 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
return null; return null;
}); });
} }
public static class SubmitJob implements HapiJob {
@Autowired
private IBulkDataExportSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.buildExportFiles();
}
}
} }

View File

@ -45,7 +45,7 @@ import java.util.*;
@Component @Component
public class CacheWarmingSvcImpl implements ICacheWarmingSvc { public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
public static final long SCHEDULED_JOB_INTERVAL = 10 * DateUtils.MILLIS_PER_SECOND; public static final long JOB_INTERVAL_MILLIS = 10 * DateUtils.MILLIS_PER_SECOND;
private static final Logger ourLog = LoggerFactory.getLogger(CacheWarmingSvcImpl.class); private static final Logger ourLog = LoggerFactory.getLogger(CacheWarmingSvcImpl.class);
@Autowired @Autowired
private DaoConfig myDaoConfig; private DaoConfig myDaoConfig;
@ -81,14 +81,6 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
} }
@PostConstruct
public void registerScheduledJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(CacheWarmingSvcImpl.class.getName());
jobDetail.setJobClass(CacheWarmingSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelayClustered(SCHEDULED_JOB_INTERVAL, jobDetail);
}
private void refreshNow(WarmCacheEntry theCacheEntry) { private void refreshNow(WarmCacheEntry theCacheEntry) {
String nextUrl = theCacheEntry.getUrl(); String nextUrl = theCacheEntry.getUrl();
@ -111,6 +103,24 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
@PostConstruct @PostConstruct
public void start() { public void start() {
initCacheMap(); initCacheMap();
scheduleJob();
}
public void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(this.getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleFixedDelayClustered(JOB_INTERVAL_MILLIS, jobDetail);
}
public static class Job implements HapiJob {
@Autowired
private ICacheWarmingSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.performWarmingPass();
}
} }
public synchronized Set<WarmCacheEntry> initCacheMap() { public synchronized Set<WarmCacheEntry> initCacheMap() {
@ -127,16 +137,5 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
} }
return Collections.unmodifiableSet(myCacheEntryToNextRefresh.keySet()); return Collections.unmodifiableSet(myCacheEntryToNextRefresh.keySet());
}
public static class SubmitJob implements HapiJob {
@Autowired
private ICacheWarmingSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.performWarmingPass();
}
} }
} }

View File

@ -75,7 +75,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
@Service @Service
public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc { public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc {
public static final long SCHEDULE_DELAY = DateUtils.MILLIS_PER_SECOND; public static final long JOB_INTERVAL_MILLIS = DateUtils.MILLIS_PER_SECOND;
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class); private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class);
private static final int DEFAULT_MAX_SUBMIT = 10000; private static final int DEFAULT_MAX_SUBMIT = 10000;
private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>(); private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>();
@ -157,14 +157,6 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
return retVal; return retVal;
} }
@PostConstruct
public void registerScheduledJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(SubscriptionTriggeringSvcImpl.class.getName());
jobDetail.setJobClass(SubscriptionTriggeringSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelayLocal(SCHEDULE_DELAY, jobDetail);
}
@Override @Override
public void runDeliveryPass() { public void runDeliveryPass() {
@ -354,6 +346,11 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
@PostConstruct @PostConstruct
public void start() { public void start() {
createExecutorService();
scheduleJob();
}
private void createExecutorService() {
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000); LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("SubscriptionTriggering-%d") .namingPattern("SubscriptionTriggering-%d")
@ -384,10 +381,16 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
executorQueue, executorQueue,
threadFactory, threadFactory,
rejectedExecutionHandler); rejectedExecutionHandler);
} }
public static class SubmitJob implements HapiJob { private void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(this.getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleFixedDelayLocal(JOB_INTERVAL_MILLIS, jobDetail);
}
public static class Job implements HapiJob {
@Autowired @Autowired
private ISubscriptionTriggeringSvc myTarget; private ISubscriptionTriggeringSvc myTarget;

View File

@ -53,7 +53,7 @@ import java.util.List;
public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc { public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
private static final int SCHEDULE_INTERVAL_MILLIS = 5000; private static final int JOB_INTERVAL_MILLIS = 5000;
private static final Logger ourLog = LoggerFactory.getLogger(TermDeferredStorageSvcImpl.class); private static final Logger ourLog = LoggerFactory.getLogger(TermDeferredStorageSvcImpl.class);
@Autowired @Autowired
protected ITermConceptDao myConceptDao; protected ITermConceptDao myConceptDao;
@ -260,13 +260,24 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
} }
@PostConstruct @PostConstruct
public void registerScheduledJob() { public void scheduleJob() {
// FIXME KHS what does this mean?
// Register scheduled job to save deferred concepts // Register scheduled job to save deferred concepts
// In the future it would be great to make this a cluster-aware task somehow // In the future it would be great to make this a cluster-aware task somehow
ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
jobDefinition.setId(BaseTermReadSvcImpl.class.getName() + "_saveDeferred"); jobDefinition.setId(this.getClass().getName());
jobDefinition.setJobClass(SaveDeferredJob.class); jobDefinition.setJobClass(Job.class);
mySchedulerService.scheduleFixedDelayLocal(SCHEDULE_INTERVAL_MILLIS, jobDefinition); mySchedulerService.scheduleFixedDelayLocal(JOB_INTERVAL_MILLIS, jobDefinition);
}
public static class Job implements HapiJob {
@Autowired
private ITermDeferredStorageSvc myTerminologySvc;
@Override
public void execute(JobExecutionContext theContext) {
myTerminologySvc.saveDeferred();
}
} }
@VisibleForTesting @VisibleForTesting
@ -288,15 +299,4 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
void setConceptDaoForUnitTest(ITermConceptDao theConceptDao) { void setConceptDaoForUnitTest(ITermConceptDao theConceptDao) {
myConceptDao = theConceptDao; myConceptDao = theConceptDao;
} }
public static class SaveDeferredJob implements HapiJob {
@Autowired
private ITermDeferredStorageSvc myTerminologySvc;
@Override
public void execute(JobExecutionContext theContext) {
myTerminologySvc.saveDeferred();
}
}
} }

View File

@ -54,7 +54,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
public class TermReindexingSvcImpl implements ITermReindexingSvc { public class TermReindexingSvcImpl implements ITermReindexingSvc {
private static final Logger ourLog = LoggerFactory.getLogger(TermReindexingSvcImpl.class); private static final Logger ourLog = LoggerFactory.getLogger(TermReindexingSvcImpl.class);
private static final long SCHEDULE_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE; private static final long JOB_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE;
private static boolean ourForceSaveDeferredAlwaysForUnitTest; private static boolean ourForceSaveDeferredAlwaysForUnitTest;
@Autowired @Autowired
protected ITermConceptDao myConceptDao; protected ITermConceptDao myConceptDao;
@ -150,16 +150,17 @@ public class TermReindexingSvcImpl implements ITermReindexingSvc {
} }
@PostConstruct @PostConstruct
public void registerScheduledJob() { public void scheduleJob() {
// FIXME KHS what does this mean?
// Register scheduled job to save deferred concepts // Register scheduled job to save deferred concepts
// In the future it would be great to make this a cluster-aware task somehow // In the future it would be great to make this a cluster-aware task somehow
ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
jobDefinition.setId(TermReindexingSvcImpl.class.getName() + "_reindex"); jobDefinition.setId(this.getClass().getName());
jobDefinition.setJobClass(SaveDeferredJob.class); jobDefinition.setJobClass(Job.class);
mySchedulerService.scheduleFixedDelayLocal(SCHEDULE_INTERVAL_MILLIS, jobDefinition); mySchedulerService.scheduleFixedDelayLocal(JOB_INTERVAL_MILLIS, jobDefinition);
} }
public static class SaveDeferredJob implements HapiJob { public static class Job implements HapiJob {
@Autowired @Autowired
private ITermDeferredStorageSvc myTerminologySvc; private ITermDeferredStorageSvc myTerminologySvc;

View File

@ -22,9 +22,9 @@ package ca.uhn.fhir.jpa.model.sched;
import org.quartz.DisallowConcurrentExecution; import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job; import org.quartz.Job;
import org.quartz.PersistJobDataAfterExecution;
@DisallowConcurrentExecution @DisallowConcurrentExecution
@PersistJobDataAfterExecution // FIXME KHS need this?
// @PersistJobDataAfterExecution
public interface HapiJob extends Job { public interface HapiJob extends Job {
} }

View File

@ -50,7 +50,7 @@ import java.util.concurrent.Semaphore;
@Service @Service
@Lazy @Lazy
public class SubscriptionLoader { public class SubscriptionLoader {
public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE; public static final long JOB_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE;
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class); private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class);
private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
private final Object mySyncSubscriptionsLock = new Object(); private final Object mySyncSubscriptionsLock = new Object();
@ -88,11 +88,21 @@ public class SubscriptionLoader {
@PostConstruct @PostConstruct
public void registerScheduledJob() { public void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(SubscriptionLoader.class.getName()); jobDetail.setId(this.getClass().getName());
jobDetail.setJobClass(SubscriptionLoader.SubmitJob.class); jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleFixedDelayLocal(REFRESH_INTERVAL, jobDetail); mySchedulerService.scheduleFixedDelayLocal(JOB_INTERVAL_MILLIS, jobDetail);
}
public static class Job implements HapiJob {
@Autowired
private SubscriptionLoader myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.syncSubscriptions();
}
} }
@VisibleForTesting @VisibleForTesting
@ -157,15 +167,5 @@ public class SubscriptionLoader {
public void setSubscriptionProviderForUnitTest(ISubscriptionProvider theSubscriptionProvider) { public void setSubscriptionProviderForUnitTest(ISubscriptionProvider theSubscriptionProvider) {
mySubscriptionProvider = theSubscriptionProvider; mySubscriptionProvider = theSubscriptionProvider;
} }
public static class SubmitJob implements HapiJob {
@Autowired
private SubscriptionLoader myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.syncSubscriptions();
}
}
} }