Merge pull request #1615 from jamesagnew/ks-20191119-scheduler

Ks 20191119 scheduler
This commit is contained in:
Ken Stevens 2019-12-16 09:58:28 -05:00 committed by GitHub
commit 9aca5c49a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 846 additions and 1044 deletions

View File

@ -29,6 +29,9 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
@ -66,13 +69,6 @@ public abstract class BaseFlywayMigrateDatabaseCommand<T extends Enum> extends B
return MIGRATE_DATABASE;
}
@Override
public List<String> provideUsageNotes() {
String versions = "The following versions are supported: " +
provideAllowedVersions().stream().map(Enum::name).collect(Collectors.joining(", "));
return Collections.singletonList(versions);
}
@Override
public Options getOptions() {
Options retVal = new Options();

View File

@ -37,6 +37,7 @@ public class HapiFlywayMigrateDatabaseCommandTest {
System.setProperty("test", "true");
}
// TODO INTERMITTENT This just failed for me on CI with a BadSqlGrammarException
@Test
public void testMigrateFrom340() throws IOException, SQLException {

View File

@ -25,14 +25,14 @@ import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.model.cross.ResourcePersistentId;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob;
import ca.uhn.fhir.jpa.model.cross.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
@ -53,9 +53,7 @@ import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.InstantType;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -80,7 +78,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
private static final long REFRESH_INTERVAL = 10 * DateUtils.MILLIS_PER_SECOND;
private static final Logger ourLog = LoggerFactory.getLogger(BulkDataExportSvcImpl.class);
private int myReuseBulkExportForMillis = (int) (60 * DateUtils.MILLIS_PER_MINUTE);
@ -313,13 +310,22 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@PostConstruct
public void start() {
ourLog.info("Bulk export service starting with refresh interval {}", StopWatch.formatMillis(REFRESH_INTERVAL));
myTxTemplate = new TransactionTemplate(myTxManager);
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(BulkDataExportSvcImpl.class.getName());
jobDetail.setJobClass(BulkDataExportSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(REFRESH_INTERVAL, true, jobDetail);
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
}
public static class Job implements HapiJob {
@Autowired
private IBulkDataExportSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.buildExportFiles();
}
}
@Transactional
@ -468,22 +474,4 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
return null;
});
}
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public static class SubmitJob extends FireAtIntervalJob {
@Autowired
private IBulkDataExportSvc myTarget;
public SubmitJob() {
super(REFRESH_INTERVAL);
}
@Override
protected void doExecute(JobExecutionContext theContext) {
myTarget.buildExportFiles();
}
}
}

View File

@ -10,14 +10,13 @@ import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider;
import ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.delete.DeleteConflictService;
import ca.uhn.fhir.jpa.graphql.JpaStorageServices;
import ca.uhn.fhir.jpa.interceptor.JpaConsentContextServices;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider;
import ca.uhn.fhir.jpa.provider.TerminologyUploaderProvider;
import ca.uhn.fhir.jpa.sched.AutowiringSpringBeanJobFactory;
import ca.uhn.fhir.jpa.sched.SchedulerServiceImpl;
import ca.uhn.fhir.jpa.sched.HapiSchedulerServiceImpl;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.IStaleSearchDeletingSvc;
import ca.uhn.fhir.jpa.search.StaleSearchDeletingSvcImpl;
@ -36,13 +35,6 @@ import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribable
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.module.matcher.InMemorySubscriptionMatcher;
import ca.uhn.fhir.jpa.term.TermCodeSystemStorageSvcImpl;
import ca.uhn.fhir.jpa.term.TermDeferredStorageSvcImpl;
import ca.uhn.fhir.jpa.term.api.ITermVersionAdapterSvc;
import ca.uhn.fhir.jpa.term.TermReindexingSvcImpl;
import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;
import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
import ca.uhn.fhir.jpa.term.api.ITermReindexingSvc;
import ca.uhn.fhir.rest.server.interceptor.consent.IConsentContextServices;
import org.hibernate.jpa.HibernatePersistenceProvider;
import org.hl7.fhir.utilities.graphql.IGraphQLStorageServices;
@ -91,6 +83,7 @@ public abstract class BaseConfig {
public static final String TASK_EXECUTOR_NAME = "hapiJpaTaskExecutor";
public static final String GRAPHQL_PROVIDER_NAME = "myGraphQLProvider";
private static final String HAPI_DEFAULT_SCHEDULER_GROUP = "HAPI";
@Autowired
protected Environment myEnv;
@ -257,7 +250,7 @@ public abstract class BaseConfig {
@Bean
public ISchedulerService schedulerService() {
return new SchedulerServiceImpl();
return new HapiSchedulerServiceImpl().setDefaultGroup(HAPI_DEFAULT_SCHEDULER_GROUP);
}
@Bean

View File

@ -0,0 +1,184 @@
package ca.uhn.fhir.jpa.sched;
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.jpa.model.sched.IHapiScheduler;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.Validate;
import org.jetbrains.annotations.NotNull;
import org.quartz.*;
import org.quartz.impl.JobDetailImpl;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME;
public abstract class BaseHapiScheduler implements IHapiScheduler {
private static final Logger ourLog = LoggerFactory.getLogger(BaseHapiScheduler.class);
private static final AtomicInteger ourNextSchedulerId = new AtomicInteger();
private final String myThreadNamePrefix;
private final AutowiringSpringBeanJobFactory mySpringBeanJobFactory;
private final StdSchedulerFactory myFactory = new StdSchedulerFactory();
private final Properties myProperties = new Properties();
private Scheduler myScheduler;
private String myInstanceName;
public BaseHapiScheduler(String theThreadNamePrefix, AutowiringSpringBeanJobFactory theSpringBeanJobFactory) {
myThreadNamePrefix = theThreadNamePrefix;
mySpringBeanJobFactory = theSpringBeanJobFactory;
}
void setInstanceName(String theInstanceName) {
myInstanceName = theInstanceName;
}
int nextSchedulerId() {
return ourNextSchedulerId.getAndIncrement();
}
@Override
public void init() throws SchedulerException {
setProperties();
myFactory.initialize(myProperties);
myScheduler = myFactory.getScheduler();
myScheduler.setJobFactory(mySpringBeanJobFactory);
myScheduler.standby();
}
protected void setProperties() {
addProperty("org.quartz.threadPool.threadCount", "4");
myProperties.setProperty(PROP_SCHED_INSTANCE_NAME, myInstanceName + "-" + nextSchedulerId());
addProperty("org.quartz.threadPool.threadNamePrefix", getThreadPrefix());
}
@NotNull
private String getThreadPrefix() {
return myThreadNamePrefix + "-" + myInstanceName;
}
protected void addProperty(String key, String value) {
myProperties.put(key, value);
}
@Override
public void start() {
if (myScheduler == null) {
throw new ConfigurationException("Attempt to start uninitialized scheduler");
}
try {
ourLog.info("Starting scheduler {}", getThreadPrefix());
myScheduler.start();
} catch (SchedulerException e) {
ourLog.error("Failed to start up scheduler", e);
throw new ConfigurationException("Failed to start up scheduler", e);
}
}
@Override
public void shutdown() {
if (myScheduler == null) {
return;
}
try {
myScheduler.shutdown(true);
} catch (SchedulerException e) {
ourLog.error("Failed to shut down scheduler", e);
throw new ConfigurationException("Failed to shut down scheduler", e);
}
}
@Override
public boolean isStarted() {
try {
return myScheduler != null && myScheduler.isStarted();
} catch (SchedulerException e) {
ourLog.error("Failed to determine scheduler status", e);
return false;
}
}
@Override
public void clear() throws SchedulerException {
myScheduler.clear();
}
@Override
public void logStatusForUnitTest() {
try {
Set<JobKey> keys = myScheduler.getJobKeys(GroupMatcher.anyGroup());
String keysString = keys.stream().map(t -> t.getName()).collect(Collectors.joining(", "));
ourLog.info("Local scheduler has jobs: {}", keysString);
} catch (SchedulerException e) {
ourLog.error("Failed to get log status for scheduler", e);
throw new InternalErrorException("Failed to get log status for scheduler", e);
}
}
@Override
public void scheduleJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) {
Validate.isTrue(theIntervalMillis >= 100);
Validate.notNull(theJobDefinition);
Validate.notNull(theJobDefinition.getJobClass());
Validate.notBlank(theJobDefinition.getId());
JobKey jobKey;
jobKey = new JobKey(theJobDefinition.getId(), theJobDefinition.getGroup());
JobDetailImpl jobDetail = new NonConcurrentJobDetailImpl();
jobDetail.setJobClass(theJobDefinition.getJobClass());
jobDetail.setKey(jobKey);
jobDetail.setJobDataMap(new JobDataMap(theJobDefinition.getJobData()));
ScheduleBuilder<? extends Trigger> schedule = SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInMilliseconds(theIntervalMillis)
.repeatForever();
Trigger trigger = TriggerBuilder.newTrigger()
.forJob(jobDetail)
.startNow()
.withSchedule(schedule)
.build();
Set<? extends Trigger> triggers = Sets.newHashSet(trigger);
try {
myScheduler.scheduleJob(jobDetail, triggers, true);
} catch (SchedulerException e) {
ourLog.error("Failed to schedule job", e);
throw new InternalErrorException(e);
}
}
@VisibleForTesting
@Override
public Set<JobKey> getJobKeysForUnitTest() throws SchedulerException {
return myScheduler.getJobKeys(GroupMatcher.anyGroup());
}
private static class NonConcurrentJobDetailImpl extends JobDetailImpl {
private static final long serialVersionUID = 5716197221121989740L;
// All HAPI FHIR jobs shouldn't allow concurrent execution
@Override
public boolean isConcurrentExectionDisallowed() {
return true;
}
}
}

View File

@ -0,0 +1,225 @@
package ca.uhn.fhir.jpa.sched;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2019 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.jpa.model.sched.IHapiScheduler;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ISmartLifecyclePhase;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.util.StopWatch;
import com.google.common.annotations.VisibleForTesting;
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.env.Environment;
import javax.annotation.PostConstruct;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class provides task scheduling for the entire module using the Quartz library.
* Inside here, we have two schedulers:
* <ul>
* <li>
* The <b>Local Scheduler</b> handles tasks that need to execute locally. This
* typically means things that should happen on all nodes in a clustered
* environment.
* </li>
* <li>
* The <b>Cluster Scheduler</b> handles tasks that are distributed and should be
* handled by only one node in the cluster (assuming a clustered server). If the
* server is not clustered, this scheduler acts the same way as the
* local scheduler.
* </li>
* </ul>
*/
public abstract class BaseSchedulerServiceImpl implements ISchedulerService, SmartLifecycle {
public static final String SCHEDULING_DISABLED = "scheduling_disabled";
public static final String SCHEDULING_DISABLED_EQUALS_TRUE = SCHEDULING_DISABLED + "=true";
private static final Logger ourLog = LoggerFactory.getLogger(BaseSchedulerServiceImpl.class);
private IHapiScheduler myLocalScheduler;
private IHapiScheduler myClusteredScheduler;
private boolean myLocalSchedulingEnabled;
private boolean myClusteredSchedulingEnabled;
private AtomicBoolean myStopping = new AtomicBoolean(false);
private String myDefaultGroup;
@Autowired
private Environment myEnvironment;
@Autowired
private ApplicationContext myApplicationContext;
@Autowired
protected AutowiringSpringBeanJobFactory mySchedulerJobFactory;
public BaseSchedulerServiceImpl() {
setLocalSchedulingEnabled(true);
setClusteredSchedulingEnabled(true);
}
public BaseSchedulerServiceImpl setDefaultGroup(String theDefaultGroup) {
myDefaultGroup = theDefaultGroup;
return this;
}
public boolean isLocalSchedulingEnabled() {
return myLocalSchedulingEnabled;
}
public void setLocalSchedulingEnabled(boolean theLocalSchedulingEnabled) {
myLocalSchedulingEnabled = theLocalSchedulingEnabled;
}
public boolean isClusteredSchedulingEnabled() {
return myClusteredSchedulingEnabled;
}
public void setClusteredSchedulingEnabled(boolean theClusteredSchedulingEnabled) {
myClusteredSchedulingEnabled = theClusteredSchedulingEnabled;
}
@PostConstruct
public void create() throws SchedulerException {
myLocalScheduler = createScheduler(false);
myClusteredScheduler = createScheduler(true);
myStopping.set(false);
}
private IHapiScheduler createScheduler(boolean theClustered) throws SchedulerException {
if (!isLocalSchedulingEnabled() || isSchedulingDisabledForUnitTests()) {
ourLog.info("Scheduling is disabled on this server");
return new HapiNullScheduler();
}
IHapiScheduler retval;
if (theClustered) {
ourLog.info("Creating Clustered Scheduler");
retval = getClusteredScheduler();
} else {
ourLog.info("Creating Local Scheduler");
retval = getLocalHapiScheduler();
}
retval.init();
return retval;
}
protected abstract IHapiScheduler getLocalHapiScheduler();
protected abstract IHapiScheduler getClusteredScheduler();
/**
* We defer startup of executing started tasks until we're sure we're ready for it
* and the startup is completely done
*/
@Override
public int getPhase() {
return ISmartLifecyclePhase.SCHEDULER_1000;
}
@Override
public void start() {
try {
ourLog.info("Starting task schedulers for context {}", myApplicationContext.getId());
if (myLocalScheduler != null) {
myLocalScheduler.start();
}
if (myClusteredScheduler != null) {
myClusteredScheduler.start();
}
} catch (Exception e) {
ourLog.error("Failed to start scheduler", e);
throw new ConfigurationException("Failed to start scheduler", e);
}
}
@Override
public void stop() {
ourLog.info("Shutting down task scheduler...");
myStopping.set(true);
myLocalScheduler.shutdown();
myClusteredScheduler.shutdown();
}
@Override
public boolean isRunning() {
return !myStopping.get() && myLocalScheduler.isStarted() && myClusteredScheduler.isStarted();
}
@Override
public void purgeAllScheduledJobsForUnitTest() throws SchedulerException {
myLocalScheduler.clear();
myClusteredScheduler.clear();
}
@Override
public void logStatusForUnitTest() {
myLocalScheduler.logStatusForUnitTest();
myClusteredScheduler.logStatusForUnitTest();
}
@Override
public void scheduleLocalJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) {
scheduleJob("local", myLocalScheduler, theIntervalMillis, theJobDefinition);
}
@Override
public void scheduleClusteredJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) {
scheduleJob("clustered", myClusteredScheduler, theIntervalMillis, theJobDefinition);
}
private void scheduleJob(String theInstanceName, IHapiScheduler theScheduler, long theIntervalMillis, ScheduledJobDefinition theJobDefinition) {
ourLog.info("Scheduling {} job {} with interval {}", theInstanceName, theJobDefinition.getId(), StopWatch.formatMillis(theIntervalMillis));
if (theJobDefinition.getGroup() == null) {
theJobDefinition.setGroup(myDefaultGroup);
}
theScheduler.scheduleJob(theIntervalMillis, theJobDefinition);
}
@VisibleForTesting
@Override
public Set<JobKey> getLocalJobKeysForUnitTest() throws SchedulerException {
return myLocalScheduler.getJobKeysForUnitTest();
}
@VisibleForTesting
@Override
public Set<JobKey> getClusteredJobKeysForUnitTest() throws SchedulerException {
return myClusteredScheduler.getJobKeysForUnitTest();
}
private boolean isSchedulingDisabledForUnitTests() {
String schedulingDisabled = myEnvironment.getProperty(SCHEDULING_DISABLED);
return "true".equals(schedulingDisabled);
}
@Override
public boolean isStopping() {
return myStopping.get();
}
}

View File

@ -0,0 +1,8 @@
package ca.uhn.fhir.jpa.sched;
public class ClusteredHapiScheduler extends BaseHapiScheduler {
public ClusteredHapiScheduler(String theThreadNamePrefix, AutowiringSpringBeanJobFactory theSpringBeanJobFactory) {
super(theThreadNamePrefix, theSpringBeanJobFactory);
setInstanceName("clustered");
}
}

View File

@ -0,0 +1,54 @@
package ca.uhn.fhir.jpa.sched;
import ca.uhn.fhir.jpa.model.sched.IHapiScheduler;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
public class HapiNullScheduler implements IHapiScheduler {
private static final Logger ourLog = LoggerFactory.getLogger(HapiNullScheduler.class);
@Override
public void init() {
// nothing
}
@Override
public void start() {
}
@Override
public void shutdown() {
}
@Override
public boolean isStarted() {
return true;
}
@Override
public void clear() throws SchedulerException {
}
@Override
public void logStatusForUnitTest() {
}
@Override
public void scheduleJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) {
ourLog.debug("Skipping scheduling job {} since scheduling is disabled", theJobDefinition.getId());
}
@Override
public Set<JobKey> getJobKeysForUnitTest() {
return null;
}
}

View File

@ -0,0 +1,17 @@
package ca.uhn.fhir.jpa.sched;
import ca.uhn.fhir.jpa.model.sched.IHapiScheduler;
public class HapiSchedulerServiceImpl extends BaseSchedulerServiceImpl {
public static final String THREAD_NAME_PREFIX = "hapi-fhir-jpa-scheduler";
@Override
protected IHapiScheduler getLocalHapiScheduler() {
return new LocalHapiScheduler(THREAD_NAME_PREFIX, mySchedulerJobFactory);
}
@Override
protected IHapiScheduler getClusteredScheduler() {
return new ClusteredHapiScheduler(THREAD_NAME_PREFIX, mySchedulerJobFactory);
}
}

View File

@ -0,0 +1,8 @@
package ca.uhn.fhir.jpa.sched;
public class LocalHapiScheduler extends BaseHapiScheduler {
public LocalHapiScheduler(String theThreadNamePrefix, AutowiringSpringBeanJobFactory theSpringBeanJobFactory) {
super(theThreadNamePrefix, theSpringBeanJobFactory);
setInstanceName("local");
}
}

View File

@ -1,579 +0,0 @@
package ca.uhn.fhir.jpa.sched;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2019 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.Validate;
import org.quartz.Calendar;
import org.quartz.*;
import org.quartz.impl.JobDetailImpl;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.spi.JobFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.env.Environment;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME;
/**
* This class provides task scheduling for the entire module using the Quartz library.
* Inside here, we have two schedulers:
* <ul>
* <li>
* The <b>Local Scheduler</b> handles tasks that need to execute locally. This
* typically means things that should happen on all nodes in a clustered
* environment.
* </li>
* <li>
* The <b>Cluster Scheduler</b> handles tasks that are distributed and should be
* handled by only one node in the cluster (assuming a clustered server). If the
* server is not clustered, this scheduler acts the same way as the
* local scheduler.
* </li>
* </ul>
*/
public class SchedulerServiceImpl implements ISchedulerService {
public static final String SCHEDULING_DISABLED = "scheduling_disabled";
public static final String SCHEDULING_DISABLED_EQUALS_TRUE = SCHEDULING_DISABLED + "=true";
private static final Logger ourLog = LoggerFactory.getLogger(SchedulerServiceImpl.class);
private static int ourNextSchedulerId = 0;
private Scheduler myLocalScheduler;
private Scheduler myClusteredScheduler;
private String myThreadNamePrefix;
private boolean myLocalSchedulingEnabled;
private boolean myClusteredSchedulingEnabled;
@Autowired
private AutowiringSpringBeanJobFactory mySpringBeanJobFactory;
private AtomicBoolean myStopping = new AtomicBoolean(false);
@Autowired
private Environment myEnvironment;
/**
* Constructor
*/
public SchedulerServiceImpl() {
setThreadNamePrefix("hapi-fhir-jpa-scheduler");
setLocalSchedulingEnabled(true);
setClusteredSchedulingEnabled(true);
}
public boolean isLocalSchedulingEnabled() {
return myLocalSchedulingEnabled;
}
public void setLocalSchedulingEnabled(boolean theLocalSchedulingEnabled) {
myLocalSchedulingEnabled = theLocalSchedulingEnabled;
}
public boolean isClusteredSchedulingEnabled() {
return myClusteredSchedulingEnabled;
}
public void setClusteredSchedulingEnabled(boolean theClusteredSchedulingEnabled) {
myClusteredSchedulingEnabled = theClusteredSchedulingEnabled;
}
public String getThreadNamePrefix() {
return myThreadNamePrefix;
}
public void setThreadNamePrefix(String theThreadNamePrefix) {
myThreadNamePrefix = theThreadNamePrefix;
}
@PostConstruct
public void start() throws SchedulerException {
myLocalScheduler = createLocalScheduler();
myClusteredScheduler = createClusteredScheduler();
myStopping.set(false);
}
/**
* We defer startup of executing started tasks until we're sure we're ready for it
* and the startup is completely done
*/
@EventListener
public void contextStarted(ContextRefreshedEvent theEvent) throws SchedulerException {
try {
ourLog.info("Starting task schedulers for context {}", theEvent != null ? theEvent.getApplicationContext().getId() : "null");
if (myLocalScheduler != null) {
myLocalScheduler.start();
}
if (myClusteredScheduler != null) {
myClusteredScheduler.start();
}
} catch (Exception e) {
ourLog.error("Failed to start context", e);
throw new SchedulerException(e);
}
}
private Scheduler createLocalScheduler() throws SchedulerException {
if (!isLocalSchedulingEnabled() || isSchedulingDisabledForUnitTests()) {
return new NullScheduler();
}
Properties localProperties = new Properties();
localProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "local-" + ourNextSchedulerId++);
quartzPropertiesCommon(localProperties);
quartzPropertiesLocal(localProperties);
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(localProperties);
Scheduler scheduler = factory.getScheduler();
configureSchedulerCommon(scheduler);
scheduler.standby();
return scheduler;
}
private Scheduler createClusteredScheduler() throws SchedulerException {
if (!isClusteredSchedulingEnabled() || isSchedulingDisabledForUnitTests()) {
return new NullScheduler();
}
Properties clusteredProperties = new Properties();
clusteredProperties.setProperty(PROP_SCHED_INSTANCE_NAME, "clustered-" + ourNextSchedulerId++);
quartzPropertiesCommon(clusteredProperties);
quartzPropertiesClustered(clusteredProperties);
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(clusteredProperties);
Scheduler scheduler = factory.getScheduler();
configureSchedulerCommon(scheduler);
scheduler.standby();
return scheduler;
}
private void configureSchedulerCommon(Scheduler theScheduler) throws SchedulerException {
theScheduler.setJobFactory(mySpringBeanJobFactory);
}
@PreDestroy
public void stop() throws SchedulerException {
ourLog.info("Shutting down task scheduler...");
myStopping.set(true);
myLocalScheduler.shutdown(true);
myClusteredScheduler.shutdown(true);
}
@Override
public void purgeAllScheduledJobsForUnitTest() throws SchedulerException {
myLocalScheduler.clear();
myClusteredScheduler.clear();
}
@Override
public void logStatus() {
try {
Set<JobKey> keys = myLocalScheduler.getJobKeys(GroupMatcher.anyGroup());
String keysString = keys.stream().map(t -> t.getName()).collect(Collectors.joining(", "));
ourLog.info("Local scheduler has jobs: {}", keysString);
keys = myClusteredScheduler.getJobKeys(GroupMatcher.anyGroup());
keysString = keys.stream().map(t -> t.getName()).collect(Collectors.joining(", "));
ourLog.info("Clustered scheduler has jobs: {}", keysString);
} catch (SchedulerException e) {
throw new InternalErrorException(e);
}
}
@Override
public void scheduleFixedDelay(long theIntervalMillis, boolean theClusteredTask, ScheduledJobDefinition theJobDefinition) {
Validate.isTrue(theIntervalMillis >= 100);
Validate.notNull(theJobDefinition);
Validate.notNull(theJobDefinition.getJobClass());
Validate.notBlank(theJobDefinition.getId());
JobKey jobKey = new JobKey(theJobDefinition.getId());
JobDetailImpl jobDetail = new NonConcurrentJobDetailImpl();
jobDetail.setJobClass(theJobDefinition.getJobClass());
jobDetail.setKey(jobKey);
jobDetail.setName(theJobDefinition.getId());
jobDetail.setJobDataMap(new JobDataMap(theJobDefinition.getJobData()));
ScheduleBuilder<? extends Trigger> schedule = SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInMilliseconds(theIntervalMillis)
.repeatForever();
Trigger trigger = TriggerBuilder.newTrigger()
.forJob(jobDetail)
.startNow()
.withSchedule(schedule)
.build();
Set<? extends Trigger> triggers = Sets.newHashSet(trigger);
try {
Scheduler scheduler;
if (theClusteredTask) {
scheduler = myClusteredScheduler;
} else {
scheduler = myLocalScheduler;
}
scheduler.scheduleJob(jobDetail, triggers, true);
} catch (SchedulerException e) {
ourLog.error("Failed to schedule job", e);
throw new InternalErrorException(e);
}
}
@Override
public boolean isStopping() {
return myStopping.get();
}
/**
* Properties for the local scheduler (see the class docs to learn what this means)
*/
protected void quartzPropertiesLocal(Properties theProperties) {
// nothing
}
/**
* Properties for the cluster scheduler (see the class docs to learn what this means)
*/
protected void quartzPropertiesClustered(Properties theProperties) {
// theProperties.put("org.quartz.jobStore.tablePrefix", "QRTZHFJC_");
}
protected void quartzPropertiesCommon(Properties theProperties) {
theProperties.put("org.quartz.threadPool.threadCount", "4");
theProperties.put("org.quartz.threadPool.threadNamePrefix", getThreadNamePrefix() + "-" + theProperties.get(PROP_SCHED_INSTANCE_NAME));
}
private boolean isSchedulingDisabledForUnitTests() {
String schedulingDisabled = myEnvironment.getProperty(SCHEDULING_DISABLED);
return "true".equals(schedulingDisabled);
}
private static class NonConcurrentJobDetailImpl extends JobDetailImpl {
private static final long serialVersionUID = 5716197221121989740L;
// All HAPI FHIR jobs shouldn't allow concurrent execution
@Override
public boolean isConcurrentExectionDisallowed() {
return true;
}
}
private static class NullScheduler implements Scheduler {
@Override
public String getSchedulerName() {
return null;
}
@Override
public String getSchedulerInstanceId() {
return null;
}
@Override
public SchedulerContext getContext() {
return null;
}
@Override
public void start() {
}
@Override
public void startDelayed(int seconds) {
}
@Override
public boolean isStarted() {
return false;
}
@Override
public void standby() {
}
@Override
public boolean isInStandbyMode() {
return false;
}
@Override
public void shutdown() {
}
@Override
public void shutdown(boolean waitForJobsToComplete) {
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public SchedulerMetaData getMetaData() {
return null;
}
@Override
public List<JobExecutionContext> getCurrentlyExecutingJobs() {
return null;
}
@Override
public void setJobFactory(JobFactory factory) {
}
@Override
public ListenerManager getListenerManager() {
return null;
}
@Override
public Date scheduleJob(JobDetail jobDetail, Trigger trigger) {
return null;
}
@Override
public Date scheduleJob(Trigger trigger) {
return null;
}
@Override
public void scheduleJobs(Map<JobDetail, Set<? extends Trigger>> triggersAndJobs, boolean replace) {
}
@Override
public void scheduleJob(JobDetail jobDetail, Set<? extends Trigger> triggersForJob, boolean replace) {
}
@Override
public boolean unscheduleJob(TriggerKey triggerKey) {
return false;
}
@Override
public boolean unscheduleJobs(List<TriggerKey> triggerKeys) {
return false;
}
@Override
public Date rescheduleJob(TriggerKey triggerKey, Trigger newTrigger) {
return null;
}
@Override
public void addJob(JobDetail jobDetail, boolean replace) {
}
@Override
public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) {
}
@Override
public boolean deleteJob(JobKey jobKey) {
return false;
}
@Override
public boolean deleteJobs(List<JobKey> jobKeys) {
return false;
}
@Override
public void triggerJob(JobKey jobKey) {
}
@Override
public void triggerJob(JobKey jobKey, JobDataMap data) {
}
@Override
public void pauseJob(JobKey jobKey) {
}
@Override
public void pauseJobs(GroupMatcher<JobKey> matcher) {
}
@Override
public void pauseTrigger(TriggerKey triggerKey) {
}
@Override
public void pauseTriggers(GroupMatcher<TriggerKey> matcher) {
}
@Override
public void resumeJob(JobKey jobKey) {
}
@Override
public void resumeJobs(GroupMatcher<JobKey> matcher) {
}
@Override
public void resumeTrigger(TriggerKey triggerKey) {
}
@Override
public void resumeTriggers(GroupMatcher<TriggerKey> matcher) {
}
@Override
public void pauseAll() {
}
@Override
public void resumeAll() {
}
@Override
public List<String> getJobGroupNames() {
return null;
}
@Override
public Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher) {
return null;
}
@Override
public List<? extends Trigger> getTriggersOfJob(JobKey jobKey) {
return null;
}
@Override
public List<String> getTriggerGroupNames() {
return null;
}
@Override
public Set<TriggerKey> getTriggerKeys(GroupMatcher<TriggerKey> matcher) {
return null;
}
@Override
public Set<String> getPausedTriggerGroups() {
return null;
}
@Override
public JobDetail getJobDetail(JobKey jobKey) {
return null;
}
@Override
public Trigger getTrigger(TriggerKey triggerKey) {
return null;
}
@Override
public Trigger.TriggerState getTriggerState(TriggerKey triggerKey) {
return null;
}
@Override
public void resetTriggerFromErrorState(TriggerKey triggerKey) {
}
@Override
public void addCalendar(String calName, Calendar calendar, boolean replace, boolean updateTriggers) {
}
@Override
public boolean deleteCalendar(String calName) {
return false;
}
@Override
public Calendar getCalendar(String calName) {
return null;
}
@Override
public List<String> getCalendarNames() {
return null;
}
@Override
public boolean interrupt(JobKey jobKey) throws UnableToInterruptJobException {
return false;
}
@Override
public boolean interrupt(String fireInstanceId) throws UnableToInterruptJobException {
return false;
}
@Override
public boolean checkExists(JobKey jobKey) {
return false;
}
@Override
public boolean checkExists(TriggerKey triggerKey) {
return false;
}
@Override
public void clear() {
}
}
}

View File

@ -21,10 +21,10 @@ package ca.uhn.fhir.jpa.search;
*/
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Propagation;
@ -58,11 +58,21 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
}
@PostConstruct
public void registerScheduledJob() {
public void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(StaleSearchDeletingSvcImpl.class.getName());
jobDetail.setJobClass(StaleSearchDeletingSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(DEFAULT_CUTOFF_SLACK, true, jobDetail);
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleClusteredJob(DEFAULT_CUTOFF_SLACK, jobDetail);
}
public static class Job implements HapiJob {
@Autowired
private IStaleSearchDeletingSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.schedulePollForStaleSearches();
}
}
@Transactional(propagation = Propagation.NEVER)
@ -72,14 +82,4 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
pollForStaleSearchesAndDeleteThem();
}
}
public static class SubmitJob implements Job {
@Autowired
private IStaleSearchDeletingSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.schedulePollForStaleSearches();
}
}
}

View File

@ -21,10 +21,10 @@ package ca.uhn.fhir.jpa.search.cache;
*/
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import org.apache.commons.lang3.time.DateUtils;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
@ -51,11 +51,11 @@ public abstract class BaseSearchCacheSvcImpl implements ISearchCacheSvc {
}
@PostConstruct
public void registerScheduledJob() {
public void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(BaseSearchCacheSvcImpl.class.getName());
jobDetail.setJobClass(BaseSearchCacheSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_SECOND, false, jobDetail);
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleLocalJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
}
@Override
@ -73,7 +73,7 @@ public abstract class BaseSearchCacheSvcImpl implements ISearchCacheSvc {
protected abstract void flushLastUpdated(Long theSearchId, Date theLastUpdated);
public static class SubmitJob implements Job {
public static class Job implements HapiJob {
@Autowired
private ISearchCacheSvc myTarget;

View File

@ -33,6 +33,7 @@ import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
import ca.uhn.fhir.jpa.model.entity.ForcedId;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
@ -46,7 +47,6 @@ import org.apache.commons.lang3.time.DateUtils;
import org.hibernate.search.util.impl.Executors;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.InstantType;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -142,10 +142,16 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
myContext = theContext;
}
@VisibleForTesting
void setSchedulerServiceForUnitTest(ISchedulerService theSchedulerService) {
mySchedulerService = theSchedulerService;
}
@PostConstruct
public void start() {
myTxTemplate = new TransactionTemplate(myTxManager);
initExecutor();
scheduleJob();
}
public void initExecutor() {
@ -160,6 +166,13 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
);
}
public void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
}
@Override
@Transactional(Transactional.TxType.REQUIRED)
public Long markAllResourcesForReindexing() {
@ -187,12 +200,14 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
return job.getId();
}
@PostConstruct
public void registerScheduledJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(ResourceReindexingSvcImpl.class.getName());
jobDetail.setJobClass(ResourceReindexingSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_SECOND, true, jobDetail);
public static class Job implements HapiJob {
@Autowired
private IResourceReindexingSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.runReindexingPass();
}
}
@VisibleForTesting
@ -545,14 +560,4 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
return myUpdated;
}
}
public static class SubmitJob implements Job {
@Autowired
private IResourceReindexingSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.runReindexingPass();
}
}
}

View File

@ -26,16 +26,14 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.util.UrlUtil;
import org.apache.commons.lang3.time.DateUtils;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -47,7 +45,6 @@ import java.util.*;
@Component
public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
public static final long SCHEDULED_JOB_INTERVAL = 10 * DateUtils.MILLIS_PER_SECOND;
private static final Logger ourLog = LoggerFactory.getLogger(CacheWarmingSvcImpl.class);
@Autowired
private DaoConfig myDaoConfig;
@ -83,14 +80,6 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
}
@PostConstruct
public void registerScheduledJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(CacheWarmingSvcImpl.class.getName());
jobDetail.setJobClass(CacheWarmingSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(SCHEDULED_JOB_INTERVAL, true, jobDetail);
}
private void refreshNow(WarmCacheEntry theCacheEntry) {
String nextUrl = theCacheEntry.getUrl();
@ -113,6 +102,24 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
@PostConstruct
public void start() {
initCacheMap();
scheduleJob();
}
public void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
}
public static class Job implements HapiJob {
@Autowired
private ICacheWarmingSvc myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.performWarmingPass();
}
}
public synchronized Set<WarmCacheEntry> initCacheMap() {
@ -129,22 +136,5 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
}
return Collections.unmodifiableSet(myCacheEntryToNextRefresh.keySet());
}
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public static class SubmitJob extends FireAtIntervalJob {
@Autowired
private ICacheWarmingSvc myTarget;
public SubmitJob() {
super(SCHEDULED_JOB_INTERVAL);
}
@Override
protected void doExecute(JobExecutionContext theContext) {
myTarget.performWarmingPass();
}
}
}

View File

@ -26,7 +26,7 @@ import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.cross.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider;
@ -55,9 +55,7 @@ import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -77,7 +75,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
@Service
public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc {
public static final long SCHEDULE_DELAY = DateUtils.MILLIS_PER_SECOND;
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class);
private static final int DEFAULT_MAX_SUBMIT = 10000;
private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>();
@ -159,14 +156,6 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
return retVal;
}
@PostConstruct
public void registerScheduledJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(SubscriptionTriggeringSvcImpl.class.getName());
jobDetail.setJobClass(SubscriptionTriggeringSvcImpl.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(SCHEDULE_DELAY, false, jobDetail);
}
@Override
public void runDeliveryPass() {
@ -356,6 +345,11 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
@PostConstruct
public void start() {
createExecutorService();
scheduleJob();
}
private void createExecutorService() {
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("SubscriptionTriggering-%d")
@ -386,21 +380,21 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
executorQueue,
threadFactory,
rejectedExecutionHandler);
}
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public static class SubmitJob extends FireAtIntervalJob {
private void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_SECOND, jobDetail);
}
public static class Job implements HapiJob {
@Autowired
private ISubscriptionTriggeringSvc myTarget;
public SubmitJob() {
super(SCHEDULE_DELAY);
}
@Override
protected void doExecute(JobExecutionContext theContext) {
public void execute(JobExecutionContext theContext) {
myTarget.runDeliveryPass();
}
}

View File

@ -23,18 +23,14 @@ package ca.uhn.fhir.jpa.term;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.context.support.IContextValidationSupport;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IDao;
import ca.uhn.fhir.jpa.dao.IFhirResourceDaoCodeSystem;
import ca.uhn.fhir.jpa.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.dao.*;
import ca.uhn.fhir.jpa.dao.IFhirResourceDaoValueSet.ValidateCodeResult;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
import ca.uhn.fhir.jpa.dao.data.*;
import ca.uhn.fhir.jpa.entity.*;
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink.RelationshipTypeEnum;
import ca.uhn.fhir.jpa.model.cross.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;
@ -60,11 +56,7 @@ import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.lucene.index.Term;
import org.apache.lucene.queries.TermsQuery;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.MultiPhraseQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.RegexpQuery;
import org.apache.lucene.search.*;
import org.hibernate.ScrollMode;
import org.hibernate.ScrollableResults;
import org.hibernate.search.jpa.FullTextEntityManager;
@ -78,7 +70,6 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.*;
import org.hl7.fhir.r4.model.codesystems.ConceptSubsumptionOutcome;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
@ -101,21 +92,14 @@ import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.PersistenceContextType;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Join;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import javax.persistence.criteria.*;
import javax.validation.constraints.NotNull;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNoneBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.*;
public abstract class BaseTermReadSvcImpl implements ITermReadSvc, ApplicationContextAware {
public static final int DEFAULT_FETCH_SIZE = 250;
@ -222,23 +206,6 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc, ApplicationCo
return retVal;
}
@PostConstruct
public void buildTranslationCaches() {
Long timeout = myDaoConfig.getTranslationCachesExpireAfterWriteInMinutes();
myTranslationCache =
Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(timeout, TimeUnit.MINUTES)
.build();
myTranslationWithReverseCache =
Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(timeout, TimeUnit.MINUTES)
.build();
}
/**
* This method is present only for unit tests, do not call from client code
*/
@ -1316,16 +1283,44 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc, ApplicationCo
RuleBasedTransactionAttribute rules = new RuleBasedTransactionAttribute();
rules.getRollbackRules().add(new NoRollbackRuleAttribute(ExpansionTooCostlyException.class));
myTxTemplate = new TransactionTemplate(myTransactionManager, rules);
buildTranslationCaches();
scheduleJob();
}
@PostConstruct
public void registerScheduledJob() {
private void buildTranslationCaches() {
Long timeout = myDaoConfig.getTranslationCachesExpireAfterWriteInMinutes();
myTranslationCache =
Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(timeout, TimeUnit.MINUTES)
.build();
myTranslationWithReverseCache =
Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(timeout, TimeUnit.MINUTES)
.build();
}
public void scheduleJob() {
// TODO KHS what does this mean?
// Register scheduled job to pre-expand ValueSets
// In the future it would be great to make this a cluster-aware task somehow
ScheduledJobDefinition vsJobDefinition = new ScheduledJobDefinition();
vsJobDefinition.setId(BaseTermReadSvcImpl.class.getName() + "_preExpandValueSets");
vsJobDefinition.setJobClass(PreExpandValueSetsJob.class);
mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_MINUTE, true, vsJobDefinition);
vsJobDefinition.setId(getClass().getName());
vsJobDefinition.setJobClass(Job.class);
mySchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_MINUTE, vsJobDefinition);
}
public static class Job implements HapiJob {
@Autowired
private ITermReadSvc myTerminologySvc;
@Override
public void execute(JobExecutionContext theContext) {
myTerminologySvc.preExpandDeferredValueSetsToTerminologyTables();
}
}
@Override
@ -1881,17 +1876,6 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc, ApplicationCo
.findFirst();
}
public static class PreExpandValueSetsJob implements Job {
@Autowired
private ITermReadSvc myTerminologySvc;
@Override
public void execute(JobExecutionContext theContext) {
myTerminologySvc.preExpandDeferredValueSetsToTerminologyTables();
}
}
static List<TermConcept> toPersistedConcepts(List<CodeSystem.ConceptDefinitionComponent> theConcept, TermCodeSystemVersion theCodeSystemVersion) {
ArrayList<TermConcept> retVal = new ArrayList<>();

View File

@ -25,7 +25,7 @@ import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
import ca.uhn.fhir.jpa.dao.data.ITermConceptParentChildLinkDao;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink;
import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;
@ -53,7 +53,6 @@ import java.util.List;
public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
private static final int SCHEDULE_INTERVAL_MILLIS = 5000;
private static final Logger ourLog = LoggerFactory.getLogger(TermDeferredStorageSvcImpl.class);
@Autowired
protected ITermConceptDao myConceptDao;
@ -260,13 +259,24 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
}
@PostConstruct
public void registerScheduledJob() {
public void scheduleJob() {
// TODO KHS what does this mean?
// Register scheduled job to save deferred concepts
// In the future it would be great to make this a cluster-aware task somehow
ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
jobDefinition.setId(BaseTermReadSvcImpl.class.getName() + "_saveDeferred");
jobDefinition.setJobClass(SaveDeferredJob.class);
mySchedulerService.scheduleFixedDelay(SCHEDULE_INTERVAL_MILLIS, false, jobDefinition);
jobDefinition.setId(this.getClass().getName());
jobDefinition.setJobClass(Job.class);
mySchedulerService.scheduleLocalJob(5000, jobDefinition);
}
public static class Job implements HapiJob {
@Autowired
private ITermDeferredStorageSvc myTerminologySvc;
@Override
public void execute(JobExecutionContext theContext) {
myTerminologySvc.saveDeferred();
}
}
@VisibleForTesting
@ -288,23 +298,4 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
void setConceptDaoForUnitTest(ITermConceptDao theConceptDao) {
myConceptDao = theConceptDao;
}
public static class SaveDeferredJob extends FireAtIntervalJob {
@Autowired
private ITermDeferredStorageSvc myTerminologySvc;
/**
* Constructor
*/
public SaveDeferredJob() {
super(SCHEDULE_INTERVAL_MILLIS);
}
@Override
protected void doExecute(JobExecutionContext theContext) {
myTerminologySvc.saveDeferred();
}
}
}

View File

@ -23,7 +23,7 @@ package ca.uhn.fhir.jpa.term;
import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
import ca.uhn.fhir.jpa.dao.data.ITermConceptParentChildLinkDao;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;
@ -54,7 +54,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
public class TermReindexingSvcImpl implements ITermReindexingSvc {
private static final Logger ourLog = LoggerFactory.getLogger(TermReindexingSvcImpl.class);
private static final long SCHEDULE_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE;
private static boolean ourForceSaveDeferredAlwaysForUnitTest;
@Autowired
protected ITermConceptDao myConceptDao;
@ -150,29 +149,22 @@ public class TermReindexingSvcImpl implements ITermReindexingSvc {
}
@PostConstruct
public void registerScheduledJob() {
public void scheduleJob() {
// TODO KHS what does this mean?
// Register scheduled job to save deferred concepts
// In the future it would be great to make this a cluster-aware task somehow
ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
jobDefinition.setId(TermReindexingSvcImpl.class.getName() + "_reindex");
jobDefinition.setJobClass(SaveDeferredJob.class);
mySchedulerService.scheduleFixedDelay(SCHEDULE_INTERVAL_MILLIS, false, jobDefinition);
jobDefinition.setId(this.getClass().getName());
jobDefinition.setJobClass(Job.class);
mySchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_MINUTE, jobDefinition);
}
public static class SaveDeferredJob extends FireAtIntervalJob {
public static class Job implements HapiJob {
@Autowired
private ITermDeferredStorageSvc myTerminologySvc;
/**
* Constructor
*/
public SaveDeferredJob() {
super(SCHEDULE_INTERVAL_MILLIS);
}
@Override
protected void doExecute(JobExecutionContext theContext) {
public void execute(JobExecutionContext theContext) {
myTerminologySvc.saveDeferred();
}
}

View File

@ -20,12 +20,12 @@ package ca.uhn.fhir.jpa.util;
* #L%
*/
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.time.DateUtils;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -92,14 +92,14 @@ public class ResourceCountCache {
}
@PostConstruct
public void registerScheduledJob() {
public void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(ResourceCountCache.class.getName());
jobDetail.setJobClass(ResourceCountCache.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_MINUTE, false, jobDetail);
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleLocalJob(10 * DateUtils.MILLIS_PER_MINUTE, jobDetail);
}
public static class SubmitJob implements Job {
public static class Job implements HapiJob {
@Autowired
private ResourceCountCache myTarget;

View File

@ -1,16 +1,18 @@
package ca.uhn.fhir.jpa.sched;
import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.quartz.*;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@ -18,7 +20,6 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.util.ProxyUtils;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.util.AopTestUtils;
@ -50,7 +51,7 @@ public class SchedulerServiceImplTest {
.setId(CountingJob.class.getName())
.setJobClass(CountingJob.class);
mySvc.scheduleFixedDelay(100, false, def);
mySvc.scheduleLocalJob(100, def);
sleepAtLeast(1000);
@ -67,12 +68,12 @@ public class SchedulerServiceImplTest {
.setId(CountingJob.class.getName())
.setJobClass(CountingJob.class);
SchedulerServiceImpl svc = AopTestUtils.getTargetObject(mySvc);
BaseSchedulerServiceImpl svc = AopTestUtils.getTargetObject(mySvc);
svc.stop();
svc.create();
svc.start();
svc.contextStarted(null);
mySvc.scheduleFixedDelay(100, false, def);
mySvc.scheduleLocalJob(100, def);
sleepAtLeast(1000);
@ -90,7 +91,7 @@ public class SchedulerServiceImplTest {
.setJobClass(CountingJob.class);
ourTaskDelay = 500;
mySvc.scheduleFixedDelay(100, false, def);
mySvc.scheduleLocalJob(100, def);
sleepAtLeast(1000);
@ -108,7 +109,7 @@ public class SchedulerServiceImplTest {
.setJobClass(CountingIntervalJob.class);
ourTaskDelay = 500;
mySvc.scheduleFixedDelay(100, false, def);
mySvc.scheduleLocalJob(100, def);
sleepAtLeast(2000);
@ -159,10 +160,7 @@ public class SchedulerServiceImplTest {
}
}
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public static class CountingIntervalJob extends FireAtIntervalJob {
public static class CountingIntervalJob implements HapiJob {
private static int ourCount;
@ -171,26 +169,20 @@ public class SchedulerServiceImplTest {
private String myStringBean;
private ApplicationContext myAppCtx;
public CountingIntervalJob() {
super(500);
}
@Override
public void doExecute(JobExecutionContext theContext) {
public void execute(JobExecutionContext theContext) {
ourLog.info("Job has fired, going to sleep for {}ms", ourTaskDelay);
sleepAtLeast(ourTaskDelay);
ourCount++;
}
}
@Configuration
public static class TestConfiguration {
@Bean
public ISchedulerService schedulerService() {
return new SchedulerServiceImpl();
return new HapiSchedulerServiceImpl();
}
@Bean

View File

@ -19,7 +19,6 @@ import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.util.TestUtil;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.junit.After;
import org.junit.AfterClass;
@ -157,6 +156,7 @@ public class SearchCoordinatorSvcImplTest {
}
// TODO INTERMITTENT this test fails intermittently
@Test
public void testAsyncSearchLargeResultSetBigCountSameCoordinator() {
List<ResourcePersistentId> allResults = new ArrayList<>();

View File

@ -1,16 +1,13 @@
package ca.uhn.fhir.jpa.search.reindex;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.*;
import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
import ca.uhn.fhir.jpa.dao.data.IResourceReindexJobDao;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -72,6 +69,8 @@ public class ResourceReindexingSvcImplTest extends BaseJpaTest {
private ISearchParamRegistry mySearchParamRegistry;
@Mock
private TransactionStatus myTxStatus;
@Mock
private ISchedulerService mySchedulerService;
@Override
protected FhirContext getContext() {
@ -97,6 +96,7 @@ public class ResourceReindexingSvcImplTest extends BaseJpaTest {
mySvc.setResourceTableDaoForUnitTest(myResourceTableDao);
mySvc.setTxManagerForUnitTest(myTxManager);
mySvc.setSearchParamRegistryForUnitTest(mySearchParamRegistry);
mySvc.setSchedulerServiceForUnitTest(mySchedulerService);
mySvc.start();
when(myTxManager.getTransaction(any())).thenReturn(myTxStatus);

View File

@ -3,11 +3,11 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider;
import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test;
import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.subscription.SubscriptionTriggeringSvcImpl;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update;
@ -16,6 +16,7 @@ import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.test.utilities.JettyUtil;
import com.google.common.collect.Lists;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
@ -34,8 +35,6 @@ import java.util.List;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.*;
import ca.uhn.fhir.test.utilities.JettyUtil;
/**
* Test the rest-hook subscriptions
*/
@ -101,7 +100,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
ourUpdatedPatients.clear();
ourContentTypes.clear();
mySchedulerService.logStatus();
mySchedulerService.logStatusForUnitTest();
}
private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {

View File

@ -27,18 +27,21 @@ import com.google.common.base.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import static org.apache.commons.lang3.StringUtils.isBlank;
public class SchemaInitializationProvider implements ISchemaInitializationProvider {
private final String mySchemaFileClassPath;
private final String mySchemaExistsIndicatorTable;
/**
*
* @param theSchemaFileClassPath pathname to script used to initialize schema
* @param theSchemaFileClassPath pathname to script used to initialize schema
* @param theSchemaExistsIndicatorTable a table name we can use to determine if this schema has already been initialized
*/
public SchemaInitializationProvider(String theSchemaFileClassPath, String theSchemaExistsIndicatorTable) {
@ -50,18 +53,20 @@ public class SchemaInitializationProvider implements ISchemaInitializationProvid
public List<String> getSqlStatements(DriverTypeEnum theDriverType) {
List<String> retval = new ArrayList<>();
String initScript;
initScript = mySchemaFileClassPath + "/" + theDriverType.getSchemaFilename();
String initScript = mySchemaFileClassPath + "/" + getInitScript(theDriverType);
try {
InputStream sqlFileInputStream = SchemaInitializationProvider.class.getResourceAsStream(initScript);
if (sqlFileInputStream == null) {
throw new ConfigurationException("Schema initialization script " + initScript + " not found on classpath");
}
// Assumes no escaped semicolons...
String[] statements = IOUtils.toString(sqlFileInputStream, Charsets.UTF_8).split("\\;");
String sqlString = IOUtils.toString(sqlFileInputStream, Charsets.UTF_8);
String sqlStringNoComments = preProcessSqlString(theDriverType, sqlString);
String[] statements = sqlStringNoComments.split("\\;");
for (String statement : statements) {
if (!statement.trim().isEmpty()) {
retval.add(statement);
String cleanedStatement = preProcessSqlStatement(theDriverType, statement);
if (!isBlank(cleanedStatement)) {
retval.add(cleanedStatement);
}
}
} catch (IOException e) {
@ -70,6 +75,19 @@ public class SchemaInitializationProvider implements ISchemaInitializationProvid
return retval;
}
protected String preProcessSqlString(DriverTypeEnum theDriverType, String sqlString) {
return sqlString;
}
protected String preProcessSqlStatement(DriverTypeEnum theDriverType, String sqlStatement) {
return sqlStatement;
}
@Nonnull
protected String getInitScript(DriverTypeEnum theDriverType) {
return theDriverType.getSchemaFilename();
}
@Override
public boolean equals(Object theO) {
if (this == theO) return true;
@ -93,3 +111,4 @@ public class SchemaInitializationProvider implements ISchemaInitializationProvid
return mySchemaExistsIndicatorTable;
}
}

View File

@ -57,6 +57,13 @@ public class Builder {
return this;
}
public Builder initializeSchema(String theVersion, String theSchemaName, ISchemaInitializationProvider theSchemaInitializationProvider) {
InitializeSchemaTask task = new InitializeSchemaTask(myRelease, theVersion, theSchemaInitializationProvider);
task.setDescription("Initialize " + theSchemaName + " schema");
mySink.addTask(task);
return this;
}
public Builder executeRawSql(String theVersion, DriverTypeEnum theDriver, @Language("SQL") String theSql) {
mySink.addTask(new ExecuteRawSqlTask(myRelease, theVersion).addSql(theDriver, theSql));
return this;

View File

@ -1,65 +0,0 @@
package ca.uhn.fhir.jpa.model.sched;
/*-
* #%L
* HAPI FHIR Model
* %%
* Copyright (C) 2014 - 2019 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public abstract class FireAtIntervalJob implements Job {
public static final String NEXT_EXECUTION_TIME = "NEXT_EXECUTION_TIME";
private static final Logger ourLog = LoggerFactory.getLogger(FireAtIntervalJob.class);
private final long myMillisBetweenExecutions;
public FireAtIntervalJob(long theMillisBetweenExecutions) {
myMillisBetweenExecutions = theMillisBetweenExecutions;
}
@Override
public final void execute(JobExecutionContext theContext) {
Long nextExecution = (Long) theContext.getJobDetail().getJobDataMap().get(NEXT_EXECUTION_TIME);
if (nextExecution != null) {
long cutoff = System.currentTimeMillis();
if (nextExecution >= cutoff) {
return;
}
}
try {
doExecute(theContext);
} catch (Throwable t) {
ourLog.error("Job threw uncaught exception", t);
} finally {
long newNextExecution = System.currentTimeMillis() + myMillisBetweenExecutions;
theContext.getJobDetail().getJobDataMap().put(NEXT_EXECUTION_TIME, newNextExecution);
}
}
protected abstract void doExecute(JobExecutionContext theContext);
}

View File

@ -1,8 +1,8 @@
package ca.uhn.fhir.jpa.sched;
package ca.uhn.fhir.jpa.model.sched;
/*-
* #%L
* HAPI FHIR JPA Server
* HAPI FHIR Model
* %%
* Copyright (C) 2014 - 2019 University Health Network
* %%
@ -20,19 +20,9 @@ package ca.uhn.fhir.jpa.sched;
* #L%
*/
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import javax.annotation.PostConstruct;
public class QuartzTableSeeder {
@Autowired
private LocalContainerEntityManagerFactoryBean myEntityManagerFactory;
@PostConstruct
public void start() {
}
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
@DisallowConcurrentExecution
public interface HapiJob extends Job {
}

View File

@ -0,0 +1,24 @@
package ca.uhn.fhir.jpa.model.sched;
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import java.util.Set;
public interface IHapiScheduler {
void init() throws SchedulerException;
void start();
void shutdown();
boolean isStarted();
void clear() throws SchedulerException;
void logStatusForUnitTest();
void scheduleJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition);
Set<JobKey> getJobKeysForUnitTest() throws SchedulerException;
}

View File

@ -21,21 +21,37 @@ package ca.uhn.fhir.jpa.model.sched;
*/
import com.google.common.annotations.VisibleForTesting;
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import java.util.Set;
public interface ISchedulerService {
@VisibleForTesting
void purgeAllScheduledJobsForUnitTest() throws SchedulerException;
void logStatus();
void logStatusForUnitTest();
/**
* Only one instance of this task will fire across the whole cluster (when running in a clustered environment).
* @param theIntervalMillis How many milliseconds between passes should this job run
* @param theClusteredTask If <code>true</code>, only one instance of this task will fire across the whole cluster (when running in a clustered environment). If <code>false</code>, or if not running in a clustered environment, this task will execute locally (and should execute on all nodes of the cluster)
* @param theJobDefinition The Job to fire
*/
void scheduleFixedDelay(long theIntervalMillis, boolean theClusteredTask, ScheduledJobDefinition theJobDefinition);
void scheduleLocalJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition);
/**
* This task will execute locally (and should execute on all nodes of the cluster if there is a cluster)
* @param theIntervalMillis How many milliseconds between passes should this job run
* @param theJobDefinition The Job to fire
*/
void scheduleClusteredJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition);
@VisibleForTesting
Set<JobKey> getLocalJobKeysForUnitTest() throws SchedulerException;
@VisibleForTesting
Set<JobKey> getClusteredJobKeysForUnitTest() throws SchedulerException;
boolean isStopping();
}

View File

@ -0,0 +1,10 @@
package ca.uhn.fhir.jpa.model.sched;
public interface ISmartLifecyclePhase {
// POST_CONSTRUCT is here as a marker for where post-construct fits into the smart lifecycle. Beans with negative phases
// will be started before @PostConstruct are called
int POST_CONSTRUCT = 0;
// We want to start scheduled tasks fairly late in the startup process
int SCHEDULER_1000 = 1000;
}

View File

@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.model.sched;
*/
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.quartz.Job;
import java.util.Collections;
@ -28,10 +29,9 @@ import java.util.HashMap;
import java.util.Map;
public class ScheduledJobDefinition {
private Class<? extends Job> myJobClass;
private String myId;
private String myGroup;
private Map<String, String> myJobData;
public Map<String, String> getJobData() {
@ -60,6 +60,15 @@ public class ScheduledJobDefinition {
return this;
}
public String getGroup() {
return myGroup;
}
public ScheduledJobDefinition setGroup(String theGroup) {
myGroup = theGroup;
return this;
}
public void addJobData(String thePropertyName, String thePropertyValue) {
Validate.notBlank(thePropertyName);
if (myJobData == null) {
@ -68,4 +77,13 @@ public class ScheduledJobDefinition {
Validate.isTrue(myJobData.containsKey(thePropertyName) == false);
myJobData.put(thePropertyName, thePropertyValue);
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("myJobClass", myJobClass)
.append("myId", myId)
.append("myGroup", myGroup)
.toString();
}
}

View File

@ -1,40 +0,0 @@
package ca.uhn.fhir.jpa.model.sched;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.quartz.JobExecutionContext;
import static ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob.NEXT_EXECUTION_TIME;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class FireAtIntervalJobTest {
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private JobExecutionContext myJobExecutionContext;
@Test
public void testExecutionThrowsException() {
FireAtIntervalJob job = new FireAtIntervalJob(1000) {
@Override
protected void doExecute(JobExecutionContext theContext) {
throw new NullPointerException();
}
};
// No exception thrown please
job.execute(myJobExecutionContext);
verify(myJobExecutionContext.getJobDetail().getJobDataMap(), times(1)).put(eq(NEXT_EXECUTION_TIME), anyLong());
}
}

View File

@ -27,6 +27,7 @@ import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.model.search.StorageProcessingMessage;
@ -47,27 +48,15 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.dstu3.model.Extension;
import org.hl7.fhir.dstu3.model.SearchParameter;
import org.hl7.fhir.instance.model.api.IBaseExtension;
import org.hl7.fhir.instance.model.api.IBaseHasExtensions;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.instance.model.api.*;
import org.hl7.fhir.r4.model.Reference;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -735,11 +724,21 @@ public class SearchParamRegistryImpl implements ISearchParamRegistry {
}
@PostConstruct
public void registerScheduledJob() {
public void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(SearchParamRegistryImpl.class.getName());
jobDetail.setJobClass(SubmitJob.class);
mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_SECOND, false, jobDetail);
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleLocalJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
}
public static class Job implements HapiJob {
@Autowired
private ISearchParamRegistry myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.refreshCacheIfNecessary();
}
}
@Override
@ -778,17 +777,6 @@ public class SearchParamRegistryImpl implements ISearchParamRegistry {
}
}
public static class SubmitJob implements Job {
@Autowired
private ISearchParamRegistry myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.refreshCacheIfNecessary();
}
}
public static Map<String, Map<String, RuntimeSearchParam>> createBuiltInSearchParamMap(FhirContext theFhirContext) {
Map<String, Map<String, RuntimeSearchParam>> resourceNameToSearchParams = new HashMap<>();

View File

@ -6,8 +6,6 @@ import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IBaseDatatype;
import org.hl7.fhir.instance.model.api.IBaseExtension;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.SearchParameter;
@ -21,7 +19,8 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.util.Map;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)

View File

@ -21,7 +21,7 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
*/
import ca.uhn.fhir.jpa.api.IDaoRegistry;
import ca.uhn.fhir.jpa.model.sched.FireAtIntervalJob;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
@ -33,9 +33,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Subscription;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -52,7 +50,6 @@ import java.util.concurrent.Semaphore;
@Service
@Lazy
public class SubscriptionLoader {
public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class);
private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
private final Object mySyncSubscriptionsLock = new Object();
@ -90,11 +87,21 @@ public class SubscriptionLoader {
@PostConstruct
public void registerScheduledJob() {
public void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(SubscriptionLoader.class.getName());
jobDetail.setJobClass(SubscriptionLoader.SubmitJob.class);
mySchedulerService.scheduleFixedDelay(REFRESH_INTERVAL, false, jobDetail);
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_MINUTE, jobDetail);
}
public static class Job implements HapiJob {
@Autowired
private SubscriptionLoader myTarget;
@Override
public void execute(JobExecutionContext theContext) {
myTarget.syncSubscriptions();
}
}
@VisibleForTesting
@ -159,21 +166,5 @@ public class SubscriptionLoader {
public void setSubscriptionProviderForUnitTest(ISubscriptionProvider theSubscriptionProvider) {
mySubscriptionProvider = theSubscriptionProvider;
}
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public static class SubmitJob extends FireAtIntervalJob {
@Autowired
private SubscriptionLoader myTarget;
public SubmitJob() {
super(REFRESH_INTERVAL);
}
@Override
protected void doExecute(JobExecutionContext theContext) {
myTarget.syncSubscriptions();
}
}
}

View File

@ -4,7 +4,10 @@ import ca.uhn.fhir.jpa.api.IDaoRegistry;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider;
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscriptionProvider;
import org.springframework.context.annotation.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;

View File

@ -1,5 +1,6 @@
package ca.uhn.fhirtest.interceptor;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
@ -13,7 +14,6 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
@ -55,7 +55,15 @@ public class AnalyticsInterceptor extends InterceptorAdapter {
}
}
public static class SubmitJob implements Job {
@PostConstruct
public void start() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleLocalJob(5000, jobDetail);
}
public static class Job implements HapiJob {
@Autowired
private AnalyticsInterceptor myAnalyticsInterceptor;
@ -65,14 +73,6 @@ public class AnalyticsInterceptor extends InterceptorAdapter {
}
}
@PostConstruct
public void start() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(SubmitJob.class);
mySchedulerService.scheduleFixedDelay(5000, false, jobDetail);
}
@PreDestroy
public void stop() throws IOException {
if (myHttpClient instanceof CloseableHttpClient) {