split scheduler from service

This commit is contained in:
Ken Stevens 2019-11-19 13:32:20 -05:00
parent 8fca468e1f
commit f3535a2bef
7 changed files with 294 additions and 442 deletions

View File

@ -0,0 +1,161 @@
package ca.uhn.fhir.jpa.sched;
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.Validate;
import org.quartz.*;
import org.quartz.impl.JobDetailImpl;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME;
public abstract class BaseHapiScheduler implements IHapiScheduler {
private static final Logger ourLog = LoggerFactory.getLogger(BaseHapiScheduler.class);
private static final AtomicInteger ourNextSchedulerId = new AtomicInteger();
private final String myThreadNamePrefix;
private final AutowiringSpringBeanJobFactory mySpringBeanJobFactory;
private final StdSchedulerFactory myFactory = new StdSchedulerFactory();
private final Properties myProperties = new Properties();
private Scheduler myScheduler;
public BaseHapiScheduler(String theThreadNamePrefix, AutowiringSpringBeanJobFactory theSpringBeanJobFactory) {
myThreadNamePrefix = theThreadNamePrefix;
mySpringBeanJobFactory = theSpringBeanJobFactory;
}
void setInstanceName(String theName) {
myProperties.setProperty(PROP_SCHED_INSTANCE_NAME, theName + "-" + nextSchedulerId());
}
int nextSchedulerId() {
return ourNextSchedulerId.getAndIncrement();
}
@Override
public void init() throws SchedulerException {
setProperties();
myFactory.initialize(myProperties);
myScheduler = myFactory.getScheduler();
myScheduler.setJobFactory(mySpringBeanJobFactory);
myScheduler.standby();
}
private void setProperties() {
myProperties.put("org.quartz.threadPool.threadCount", "4");
myProperties.put("org.quartz.threadPool.threadNamePrefix", myThreadNamePrefix + "-" + myProperties.get(PROP_SCHED_INSTANCE_NAME));
addProperties(myProperties);
}
@Override
public void start() {
try {
myScheduler.start();
} catch (SchedulerException e) {
ourLog.error("Failed to start up scheduler");
throw new ConfigurationException("Failed to start up scheduler", e);
}
}
@Override
public void shutdown() {
try {
myScheduler.shutdown(true);
} catch (SchedulerException e) {
ourLog.error("Failed to shut down scheduler");
throw new ConfigurationException("Failed to shut down scheduler", e);
}
}
@Override
public boolean isStarted() {
try {
return myScheduler.isStarted();
} catch (SchedulerException e) {
ourLog.error("Failed to determine scheduler status");
return false;
}
}
@Override
public void clear() throws SchedulerException {
myScheduler.clear();
}
@Override
public void logStatusForUnitTest() {
try {
Set<JobKey> keys = myScheduler.getJobKeys(GroupMatcher.anyGroup());
String keysString = keys.stream().map(t -> t.getName()).collect(Collectors.joining(", "));
ourLog.info("Local scheduler has jobs: {}", keysString);
} catch (SchedulerException e) {
ourLog.error("Failed to get log status for scheduler", e);
throw new InternalErrorException("Failed to get log status for scheduler", e);
}
}
@Override
public void scheduleFixedDelay(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) {
Validate.isTrue(theIntervalMillis >= 100);
Validate.notNull(theJobDefinition);
Validate.notNull(theJobDefinition.getJobClass());
Validate.notBlank(theJobDefinition.getId());
JobKey jobKey = new JobKey(theJobDefinition.getId());
JobDetailImpl jobDetail = new NonConcurrentJobDetailImpl();
jobDetail.setJobClass(theJobDefinition.getJobClass());
jobDetail.setKey(jobKey);
jobDetail.setName(theJobDefinition.getId());
jobDetail.setJobDataMap(new JobDataMap(theJobDefinition.getJobData()));
ScheduleBuilder<? extends Trigger> schedule = SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInMilliseconds(theIntervalMillis)
.repeatForever();
Trigger trigger = TriggerBuilder.newTrigger()
.forJob(jobDetail)
.startNow()
.withSchedule(schedule)
.build();
Set<? extends Trigger> triggers = Sets.newHashSet(trigger);
try {
myScheduler.scheduleJob(jobDetail, triggers, true);
} catch (SchedulerException e) {
ourLog.error("Failed to schedule job", e);
throw new InternalErrorException(e);
}
}
private static class NonConcurrentJobDetailImpl extends JobDetailImpl {
private static final long serialVersionUID = 5716197221121989740L;
// All HAPI FHIR jobs shouldn't allow concurrent execution
@Override
public boolean isConcurrentExectionDisallowed() {
return true;
}
}
abstract void addProperties(Properties theProperties);
// FIXME KHS
// quartzPropertiesClustered(clusteredProperties);
}

View File

@ -0,0 +1,18 @@
package ca.uhn.fhir.jpa.sched;
import java.util.Properties;
public class ClusteredHapiScheduler extends BaseHapiScheduler {
public ClusteredHapiScheduler(String theThreadNamePrefix, AutowiringSpringBeanJobFactory theSpringBeanJobFactory) {
super(theThreadNamePrefix, theSpringBeanJobFactory);
setInstanceName("clustered");
}
/**
* Properties for the cluster scheduler (see the class docs to learn what this means)
*/
@Override
void addProperties(Properties theProperties) {
// theProperties.put("org.quartz.jobStore.tablePrefix", "QRTZHFJC_");
}
}

View File

@ -0,0 +1,45 @@
package ca.uhn.fhir.jpa.sched;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HapiNullScheduler implements IHapiScheduler {
private static final Logger ourLog = LoggerFactory.getLogger(HapiNullScheduler.class);
@Override
public void init() {
// nothing
}
@Override
public void start() {
}
@Override
public void shutdown() {
}
@Override
public boolean isStarted() {
return true;
}
@Override
public void clear() throws SchedulerException {
}
@Override
public void logStatusForUnitTest() {
}
@Override
public void scheduleFixedDelay(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) {
ourLog.debug("Skipping scheduling job {} since scheduling is disabled", theJobDefinition.getId());
}
}

View File

@ -0,0 +1,20 @@
package ca.uhn.fhir.jpa.sched;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import org.quartz.SchedulerException;
public interface IHapiScheduler {
void init() throws SchedulerException;
void start();
void shutdown();
boolean isStarted();
void clear() throws SchedulerException;
void logStatusForUnitTest();
void scheduleFixedDelay(long theIntervalMillis, ScheduledJobDefinition theJobDefinition);
}

View File

@ -0,0 +1,18 @@
package ca.uhn.fhir.jpa.sched;
import java.util.Properties;
public class LocalHapiScheduler extends BaseHapiScheduler {
public LocalHapiScheduler(String theThreadNamePrefix, AutowiringSpringBeanJobFactory theSpringBeanJobFactory) {
super(theThreadNamePrefix, theSpringBeanJobFactory);
setInstanceName("local");
}
/**
* Properties for the local scheduler (see the class docs to learn what this means)
*/
@Override
void addProperties(Properties theProperties) {
// nothing
}
}

View File

@ -1,297 +1,15 @@
package ca.uhn.fhir.jpa.sched;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.spi.JobFactory;
import java.util.Properties;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
class NullScheduler extends BaseHapiScheduler {
class NullScheduler implements Scheduler {
@Override
public String getSchedulerName() {
return null;
public NullScheduler(String theThreadNamePrefix, AutowiringSpringBeanJobFactory theSpringBeanJobFactory) {
super(theThreadNamePrefix, theSpringBeanJobFactory);
}
@Override
public String getSchedulerInstanceId() {
return null;
}
@Override
public SchedulerContext getContext() {
return null;
}
@Override
public void start() {
}
@Override
public void startDelayed(int seconds) {
}
@Override
public boolean isStarted() {
return false;
}
@Override
public void standby() {
}
@Override
public boolean isInStandbyMode() {
return false;
}
@Override
public void shutdown() {
}
@Override
public void shutdown(boolean waitForJobsToComplete) {
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public SchedulerMetaData getMetaData() {
return null;
}
@Override
public List<JobExecutionContext> getCurrentlyExecutingJobs() {
return null;
}
@Override
public void setJobFactory(JobFactory factory) {
}
@Override
public ListenerManager getListenerManager() {
return null;
}
@Override
public Date scheduleJob(JobDetail jobDetail, Trigger trigger) {
return null;
}
@Override
public Date scheduleJob(Trigger trigger) {
return null;
}
@Override
public void scheduleJobs(Map<JobDetail, Set<? extends Trigger>> triggersAndJobs, boolean replace) {
}
@Override
public void scheduleJob(JobDetail jobDetail, Set<? extends Trigger> triggersForJob, boolean replace) {
}
@Override
public boolean unscheduleJob(TriggerKey triggerKey) {
return false;
}
@Override
public boolean unscheduleJobs(List<TriggerKey> triggerKeys) {
return false;
}
@Override
public Date rescheduleJob(TriggerKey triggerKey, Trigger newTrigger) {
return null;
}
@Override
public void addJob(JobDetail jobDetail, boolean replace) {
}
@Override
public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) {
}
@Override
public boolean deleteJob(JobKey jobKey) {
return false;
}
@Override
public boolean deleteJobs(List<JobKey> jobKeys) {
return false;
}
@Override
public void triggerJob(JobKey jobKey) {
}
@Override
public void triggerJob(JobKey jobKey, JobDataMap data) {
}
@Override
public void pauseJob(JobKey jobKey) {
}
@Override
public void pauseJobs(GroupMatcher<JobKey> matcher) {
}
@Override
public void pauseTrigger(TriggerKey triggerKey) {
}
@Override
public void pauseTriggers(GroupMatcher<TriggerKey> matcher) {
}
@Override
public void resumeJob(JobKey jobKey) {
}
@Override
public void resumeJobs(GroupMatcher<JobKey> matcher) {
}
@Override
public void resumeTrigger(TriggerKey triggerKey) {
}
@Override
public void resumeTriggers(GroupMatcher<TriggerKey> matcher) {
}
@Override
public void pauseAll() {
}
@Override
public void resumeAll() {
}
@Override
public List<String> getJobGroupNames() {
return null;
}
@Override
public Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher) {
return null;
}
@Override
public List<? extends Trigger> getTriggersOfJob(JobKey jobKey) {
return null;
}
@Override
public List<String> getTriggerGroupNames() {
return null;
}
@Override
public Set<TriggerKey> getTriggerKeys(GroupMatcher<TriggerKey> matcher) {
return null;
}
@Override
public Set<String> getPausedTriggerGroups() {
return null;
}
@Override
public JobDetail getJobDetail(JobKey jobKey) {
return null;
}
@Override
public Trigger getTrigger(TriggerKey triggerKey) {
return null;
}
@Override
public Trigger.TriggerState getTriggerState(TriggerKey triggerKey) {
return null;
}
@Override
public void resetTriggerFromErrorState(TriggerKey triggerKey) {
}
@Override
public void addCalendar(String calName, Calendar calendar, boolean replace, boolean updateTriggers) {
}
@Override
public boolean deleteCalendar(String calName) {
return false;
}
@Override
public Calendar getCalendar(String calName) {
return null;
}
@Override
public List<String> getCalendarNames() {
return null;
}
@Override
public boolean interrupt(JobKey jobKey) throws UnableToInterruptJobException {
return false;
}
@Override
public boolean interrupt(String fireInstanceId) throws UnableToInterruptJobException {
return false;
}
@Override
public boolean checkExists(JobKey jobKey) {
return false;
}
@Override
public boolean checkExists(TriggerKey triggerKey) {
return false;
}
@Override
public void clear() {
void addProperties(Properties theProperties) {
//nothing
}
}

View File

@ -24,13 +24,7 @@ import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.model.api.ISmartLifecyclePhase;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.Validate;
import org.quartz.*;
import org.quartz.impl.JobDetailImpl;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -39,13 +33,7 @@ import org.springframework.context.SmartLifecycle;
import org.springframework.core.env.Environment;
import javax.annotation.PostConstruct;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME;
/**
* This class provides task scheduling for the entire module using the Quartz library.
@ -69,13 +57,13 @@ public class SchedulerServiceImpl implements ISchedulerService, SmartLifecycle {
public static final String SCHEDULING_DISABLED_EQUALS_TRUE = SCHEDULING_DISABLED + "=true";
private static final Logger ourLog = LoggerFactory.getLogger(SchedulerServiceImpl.class);
private static AtomicInteger ourNextSchedulerId = new AtomicInteger();
private Scheduler myLocalScheduler;
private Scheduler myClusteredScheduler;
private String myThreadNamePrefix;
public static final String THREAD_NAME_PREFIX = "hapi-fhir-jpa-scheduler";
private IHapiScheduler myLocalScheduler;
private IHapiScheduler myClusteredScheduler;
private boolean myLocalSchedulingEnabled;
private boolean myClusteredSchedulingEnabled;
private AtomicBoolean myStopping = new AtomicBoolean(false);
@Autowired
private AutowiringSpringBeanJobFactory mySpringBeanJobFactory;
@Autowired
@ -87,7 +75,6 @@ public class SchedulerServiceImpl implements ISchedulerService, SmartLifecycle {
* Constructor
*/
public SchedulerServiceImpl() {
setThreadNamePrefix("hapi-fhir-jpa-scheduler");
setLocalSchedulingEnabled(true);
setClusteredSchedulingEnabled(true);
}
@ -108,55 +95,25 @@ public class SchedulerServiceImpl implements ISchedulerService, SmartLifecycle {
myClusteredSchedulingEnabled = theClusteredSchedulingEnabled;
}
public String getThreadNamePrefix() {
return myThreadNamePrefix;
}
public void setThreadNamePrefix(String theThreadNamePrefix) {
myThreadNamePrefix = theThreadNamePrefix;
}
@PostConstruct
public void create() throws SchedulerException {
myLocalScheduler = createLocalScheduler();
myClusteredScheduler = createClusteredScheduler();
myLocalScheduler = createScheduler(false);
myClusteredScheduler = createScheduler(true);
myStopping.set(false);
}
private Scheduler createLocalScheduler() throws SchedulerException {
private IHapiScheduler createScheduler(boolean theClustered) throws SchedulerException {
if (!isLocalSchedulingEnabled() || isSchedulingDisabledForUnitTests()) {
return new NullScheduler();
return new HapiNullScheduler();
}
Properties localProperties = new Properties();
localProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "local-" + ourNextSchedulerId.getAndIncrement());
quartzPropertiesCommon(localProperties);
quartzPropertiesLocal(localProperties);
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(localProperties);
Scheduler scheduler = factory.getScheduler();
configureSchedulerCommon(scheduler);
scheduler.standby();
return scheduler;
}
private Scheduler createClusteredScheduler() throws SchedulerException {
if (!isClusteredSchedulingEnabled() || isSchedulingDisabledForUnitTests()) {
return new NullScheduler();
IHapiScheduler retval;
if (theClustered) {
retval = new ClusteredHapiScheduler(THREAD_NAME_PREFIX, mySpringBeanJobFactory);
} else {
retval = new LocalHapiScheduler(THREAD_NAME_PREFIX, mySpringBeanJobFactory);
}
Properties clusteredProperties = new Properties();
clusteredProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "clustered-" + ourNextSchedulerId.getAndIncrement());
quartzPropertiesCommon(clusteredProperties);
quartzPropertiesClustered(clusteredProperties);
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(clusteredProperties);
Scheduler scheduler = factory.getScheduler();
configureSchedulerCommon(scheduler);
scheduler.standby();
return scheduler;
}
private void configureSchedulerCommon(Scheduler theScheduler) throws SchedulerException {
theScheduler.setJobFactory(mySpringBeanJobFactory);
retval.init();
return retval;
}
/**
@ -190,23 +147,13 @@ public class SchedulerServiceImpl implements ISchedulerService, SmartLifecycle {
ourLog.info("Shutting down task scheduler...");
myStopping.set(true);
try {
myLocalScheduler.shutdown(true);
myClusteredScheduler.shutdown(true);
} catch (SchedulerException e) {
ourLog.error("Failed to shut down scheduler");
throw new ConfigurationException("Failed to shut down scheduler", e);
}
myLocalScheduler.shutdown();
myClusteredScheduler.shutdown();
}
@Override
public boolean isRunning() {
try {
return !myStopping.get() && myLocalScheduler.isStarted() && myClusteredScheduler.isStarted();
} catch (SchedulerException e) {
ourLog.error("Failed to determine scheduler status", e);
return false;
}
return !myStopping.get() && myLocalScheduler.isStarted() && myClusteredScheduler.isStarted();
}
@Override
@ -217,87 +164,18 @@ public class SchedulerServiceImpl implements ISchedulerService, SmartLifecycle {
@Override
public void logStatusForUnitTest() {
try {
Set<JobKey> keys = myLocalScheduler.getJobKeys(GroupMatcher.anyGroup());
String keysString = keys.stream().map(t -> t.getName()).collect(Collectors.joining(", "));
ourLog.info("Local scheduler has jobs: {}", keysString);
keys = myClusteredScheduler.getJobKeys(GroupMatcher.anyGroup());
keysString = keys.stream().map(t -> t.getName()).collect(Collectors.joining(", "));
ourLog.info("Clustered scheduler has jobs: {}", keysString);
} catch (SchedulerException e) {
throw new InternalErrorException(e);
}
myLocalScheduler.logStatusForUnitTest();
myClusteredScheduler.logStatusForUnitTest();
}
@Override
public void scheduleFixedDelayLocal(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) {
scheduleFixedDelay(theIntervalMillis, myLocalScheduler, theJobDefinition);
myLocalScheduler.scheduleFixedDelay(theIntervalMillis, theJobDefinition);
}
@Override
public void scheduleFixedDelayClustered(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) {
scheduleFixedDelay(theIntervalMillis, myClusteredScheduler, theJobDefinition);
}
private void scheduleFixedDelay(long theIntervalMillis, Scheduler theScheduler, ScheduledJobDefinition theJobDefinition) {
Validate.isTrue(theIntervalMillis >= 100);
Validate.notNull(theJobDefinition);
Validate.notNull(theJobDefinition.getJobClass());
Validate.notBlank(theJobDefinition.getId());
JobKey jobKey = new JobKey(theJobDefinition.getId());
JobDetailImpl jobDetail = new NonConcurrentJobDetailImpl();
jobDetail.setJobClass(theJobDefinition.getJobClass());
jobDetail.setKey(jobKey);
jobDetail.setName(theJobDefinition.getId());
jobDetail.setJobDataMap(new JobDataMap(theJobDefinition.getJobData()));
ScheduleBuilder<? extends Trigger> schedule = SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInMilliseconds(theIntervalMillis)
.repeatForever();
Trigger trigger = TriggerBuilder.newTrigger()
.forJob(jobDetail)
.startNow()
.withSchedule(schedule)
.build();
Set<? extends Trigger> triggers = Sets.newHashSet(trigger);
try {
theScheduler.scheduleJob(jobDetail, triggers, true);
} catch (SchedulerException e) {
ourLog.error("Failed to schedule job", e);
throw new InternalErrorException(e);
}
}
@Override
public boolean isStopping() {
return myStopping.get();
}
/**
* Properties for the local scheduler (see the class docs to learn what this means)
*/
protected void quartzPropertiesLocal(Properties theProperties) {
// nothing
}
/**
* Properties for the cluster scheduler (see the class docs to learn what this means)
*/
protected void quartzPropertiesClustered(Properties theProperties) {
// theProperties.put("org.quartz.jobStore.tablePrefix", "QRTZHFJC_");
}
protected void quartzPropertiesCommon(Properties theProperties) {
theProperties.put("org.quartz.threadPool.threadCount", "4");
theProperties.put("org.quartz.threadPool.threadNamePrefix", getThreadNamePrefix() + "-" + theProperties.get(PROP_SCHED_INSTANCE_NAME));
myClusteredScheduler.scheduleFixedDelay(theIntervalMillis, theJobDefinition);
}
private boolean isSchedulingDisabledForUnitTests() {
@ -305,15 +183,9 @@ public class SchedulerServiceImpl implements ISchedulerService, SmartLifecycle {
return "true".equals(schedulingDisabled);
}
private static class NonConcurrentJobDetailImpl extends JobDetailImpl {
private static final long serialVersionUID = 5716197221121989740L;
// All HAPI FHIR jobs shouldn't allow concurrent execution
@Override
public boolean isConcurrentExectionDisallowed() {
return true;
}
@Override
public boolean isStopping() {
return myStopping.get();
}
}