diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java index dbf0cdeac07..759983d09ba 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java @@ -83,6 +83,7 @@ public abstract class BaseConfig { public static final String TASK_EXECUTOR_NAME = "hapiJpaTaskExecutor"; public static final String GRAPHQL_PROVIDER_NAME = "myGraphQLProvider"; + private static final String HAPI_DEFAULT_SCHEDULER_GROUP = "HAPI"; @Autowired protected Environment myEnv; @@ -249,7 +250,7 @@ public abstract class BaseConfig { @Bean public ISchedulerService schedulerService() { - return new HapiSchedulerServiceImpl(); + return new HapiSchedulerServiceImpl().setDefaultGroup(HAPI_DEFAULT_SCHEDULER_GROUP); } @Bean diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseHapiScheduler.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseHapiScheduler.java index 31792385e84..85f6373e90f 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseHapiScheduler.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseHapiScheduler.java @@ -4,6 +4,7 @@ import ca.uhn.fhir.context.ConfigurationException; import ca.uhn.fhir.jpa.model.sched.IHapiScheduler; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import org.apache.commons.lang3.Validate; import org.jetbrains.annotations.NotNull; @@ -129,12 +130,13 @@ public abstract class BaseHapiScheduler implements IHapiScheduler { Validate.notNull(theJobDefinition.getJobClass()); Validate.notBlank(theJobDefinition.getId()); - JobKey jobKey = new JobKey(theJobDefinition.getId()); + JobKey jobKey; + + jobKey = new JobKey(theJobDefinition.getId(), theJobDefinition.getGroup()); JobDetailImpl jobDetail = new NonConcurrentJobDetailImpl(); jobDetail.setJobClass(theJobDefinition.getJobClass()); jobDetail.setKey(jobKey); - jobDetail.setName(theJobDefinition.getId()); jobDetail.setJobDataMap(new JobDataMap(theJobDefinition.getJobData())); ScheduleBuilder schedule = SimpleScheduleBuilder @@ -158,6 +160,12 @@ public abstract class BaseHapiScheduler implements IHapiScheduler { } + @VisibleForTesting + @Override + public Set getJobKeysForUnitTest() throws SchedulerException { + return myScheduler.getJobKeys(GroupMatcher.anyGroup()); + } + private static class NonConcurrentJobDetailImpl extends JobDetailImpl { private static final long serialVersionUID = 5716197221121989740L; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseSchedulerServiceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseSchedulerServiceImpl.java index 5b19abb16ce..5721134e0fc 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseSchedulerServiceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/BaseSchedulerServiceImpl.java @@ -26,6 +26,8 @@ import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISmartLifecyclePhase; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.util.StopWatch; +import com.google.common.annotations.VisibleForTesting; +import org.quartz.JobKey; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +37,7 @@ import org.springframework.context.SmartLifecycle; import org.springframework.core.env.Environment; import javax.annotation.PostConstruct; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -65,6 +68,8 @@ public abstract class BaseSchedulerServiceImpl implements ISchedulerService, Sma private boolean myClusteredSchedulingEnabled; private AtomicBoolean myStopping = new AtomicBoolean(false); + private String myDefaultGroup; + @Autowired private Environment myEnvironment; @Autowired @@ -77,6 +82,11 @@ public abstract class BaseSchedulerServiceImpl implements ISchedulerService, Sma setClusteredSchedulingEnabled(true); } + public BaseSchedulerServiceImpl setDefaultGroup(String theDefaultGroup) { + myDefaultGroup = theDefaultGroup; + return this; + } + public boolean isLocalSchedulingEnabled() { return myLocalSchedulingEnabled; } @@ -174,14 +184,32 @@ public abstract class BaseSchedulerServiceImpl implements ISchedulerService, Sma @Override public void scheduleLocalJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) { - ourLog.info("Scheduling local job {} with interval {}", theJobDefinition.getId(), StopWatch.formatMillis(theIntervalMillis)); - myLocalScheduler.scheduleJob(theIntervalMillis, theJobDefinition); + scheduleJob("local", myLocalScheduler, theIntervalMillis, theJobDefinition); } @Override public void scheduleClusteredJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) { - ourLog.info("Scheduling clustered job {} with interval {}", theJobDefinition.getId(), StopWatch.formatMillis(theIntervalMillis)); - myClusteredScheduler.scheduleJob(theIntervalMillis, theJobDefinition); + scheduleJob("clustered", myClusteredScheduler, theIntervalMillis, theJobDefinition); + } + + private void scheduleJob(String theInstanceName, IHapiScheduler theScheduler, long theIntervalMillis, ScheduledJobDefinition theJobDefinition) { + ourLog.info("Scheduling {} job {} with interval {}", theInstanceName, theJobDefinition.getId(), StopWatch.formatMillis(theIntervalMillis)); + if (theJobDefinition.getGroup() == null) { + theJobDefinition.setGroup(myDefaultGroup); + } + theScheduler.scheduleJob(theIntervalMillis, theJobDefinition); + } + + @VisibleForTesting + @Override + public Set getLocalJobKeysForUnitTest() throws SchedulerException { + return myLocalScheduler.getJobKeysForUnitTest(); + } + + @VisibleForTesting + @Override + public Set getClusteredJobKeysForUnitTest() throws SchedulerException { + return myClusteredScheduler.getJobKeysForUnitTest(); } private boolean isSchedulingDisabledForUnitTests() { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/HapiNullScheduler.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/HapiNullScheduler.java index 3bcd690c143..42bc1571137 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/HapiNullScheduler.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/sched/HapiNullScheduler.java @@ -2,10 +2,13 @@ package ca.uhn.fhir.jpa.sched; import ca.uhn.fhir.jpa.model.sched.IHapiScheduler; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; +import org.quartz.JobKey; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Set; + public class HapiNullScheduler implements IHapiScheduler { private static final Logger ourLog = LoggerFactory.getLogger(HapiNullScheduler.class); @@ -43,4 +46,9 @@ public class HapiNullScheduler implements IHapiScheduler { public void scheduleJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) { ourLog.debug("Skipping scheduling job {} since scheduling is disabled", theJobDefinition.getId()); } + + @Override + public Set getJobKeysForUnitTest() { + return null; + } } diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/IHapiScheduler.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/IHapiScheduler.java index a6a32c42f06..4ce9351369d 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/IHapiScheduler.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/IHapiScheduler.java @@ -1,7 +1,10 @@ package ca.uhn.fhir.jpa.model.sched; +import org.quartz.JobKey; import org.quartz.SchedulerException; +import java.util.Set; + public interface IHapiScheduler { void init() throws SchedulerException; @@ -16,4 +19,6 @@ public interface IHapiScheduler { void logStatusForUnitTest(); void scheduleJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition); + + Set getJobKeysForUnitTest() throws SchedulerException; } diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISchedulerService.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISchedulerService.java index f8623d3ea33..656305483f3 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISchedulerService.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ISchedulerService.java @@ -21,8 +21,11 @@ package ca.uhn.fhir.jpa.model.sched; */ import com.google.common.annotations.VisibleForTesting; +import org.quartz.JobKey; import org.quartz.SchedulerException; +import java.util.Set; + public interface ISchedulerService { @VisibleForTesting @@ -44,5 +47,11 @@ public interface ISchedulerService { */ void scheduleClusteredJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition); + @VisibleForTesting + Set getLocalJobKeysForUnitTest() throws SchedulerException; + + @VisibleForTesting + Set getClusteredJobKeysForUnitTest() throws SchedulerException; + boolean isStopping(); } diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ScheduledJobDefinition.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ScheduledJobDefinition.java index caa28b0ab32..245dec8721b 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ScheduledJobDefinition.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/sched/ScheduledJobDefinition.java @@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.model.sched; */ import org.apache.commons.lang3.Validate; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.quartz.Job; import java.util.Collections; @@ -28,10 +29,9 @@ import java.util.HashMap; import java.util.Map; public class ScheduledJobDefinition { - - private Class myJobClass; private String myId; + private String myGroup; private Map myJobData; public Map getJobData() { @@ -60,6 +60,15 @@ public class ScheduledJobDefinition { return this; } + public String getGroup() { + return myGroup; + } + + public ScheduledJobDefinition setGroup(String theGroup) { + myGroup = theGroup; + return this; + } + public void addJobData(String thePropertyName, String thePropertyValue) { Validate.notBlank(thePropertyName); if (myJobData == null) { @@ -68,4 +77,13 @@ public class ScheduledJobDefinition { Validate.isTrue(myJobData.containsKey(thePropertyName) == false); myJobData.put(thePropertyName, thePropertyValue); } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("myJobClass", myJobClass) + .append("myId", myId) + .append("myGroup", myGroup) + .toString(); + } }