Add shcheduled task enablement config (#2652)

* Add shcheduled task enablement config

* Add changelog

* Test fix

* Cleanup
This commit is contained in:
James Agnew 2021-05-12 14:38:30 -04:00 committed by GitHub
parent f3a6bce3d0
commit b64346ba1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 182 additions and 28 deletions

View File

@ -0,0 +1,4 @@
---
type: add
issue: 2652
title: "Settings have been added to the JPA Server DaoConfig to enable/disable various individual kinds of scheduled tasks."

View File

@ -86,6 +86,14 @@ public class DaoConfig {
private static final Logger ourLog = LoggerFactory.getLogger(DaoConfig.class); private static final Logger ourLog = LoggerFactory.getLogger(DaoConfig.class);
private static final int DEFAULT_EXPUNGE_BATCH_SIZE = 800; private static final int DEFAULT_EXPUNGE_BATCH_SIZE = 800;
private static final int DEFAULT_MAXIMUM_DELETE_CONFLICT_COUNT = 60; private static final int DEFAULT_MAXIMUM_DELETE_CONFLICT_COUNT = 60;
/**
* This constant applies to task enablement, e.g. {@link #setEnableTaskStaleSearchCleanup(boolean)}.
*
* By default, all are enabled.
*/
public static final boolean DEFAULT_ENABLE_TASKS = true;
/** /**
* Child Configurations * Child Configurations
*/ */
@ -205,6 +213,27 @@ public class DaoConfig {
* @since 5.4.0 * @since 5.4.0
*/ */
private boolean myMatchUrlCache; private boolean myMatchUrlCache;
/**
* @since 5.5.0
*/
private boolean myEnableTaskBulkImportJobExecution;
/**
* @since 5.5.0
*/
private boolean myEnableTaskStaleSearchCleanup;
/**
* @since 5.5.0
*/
private boolean myEnableTaskPreExpandValueSets;
/**
* @since 5.5.0
*/
private boolean myEnableTaskResourceReindexing;
/**
* @since 5.5.0
*/
private boolean myEnableTaskBulkExportJobExecution;
/** /**
* Constructor * Constructor
@ -215,6 +244,13 @@ public class DaoConfig {
setExpungeThreadCount(Runtime.getRuntime().availableProcessors()); setExpungeThreadCount(Runtime.getRuntime().availableProcessors());
setBundleTypesAllowedForStorage(DEFAULT_BUNDLE_TYPES_ALLOWED_FOR_STORAGE); setBundleTypesAllowedForStorage(DEFAULT_BUNDLE_TYPES_ALLOWED_FOR_STORAGE);
// Scheduled tasks are all enabled by default
setEnableTaskBulkImportJobExecution(DEFAULT_ENABLE_TASKS);
setEnableTaskBulkExportJobExecution(DEFAULT_ENABLE_TASKS);
setEnableTaskStaleSearchCleanup(DEFAULT_ENABLE_TASKS);
setEnableTaskPreExpandValueSets(DEFAULT_ENABLE_TASKS);
setEnableTaskResourceReindexing(DEFAULT_ENABLE_TASKS);
if ("true".equalsIgnoreCase(System.getProperty(DISABLE_STATUS_BASED_REINDEX))) { if ("true".equalsIgnoreCase(System.getProperty(DISABLE_STATUS_BASED_REINDEX))) {
ourLog.info("Status based reindexing is DISABLED"); ourLog.info("Status based reindexing is DISABLED");
setStatusBasedReindexingDisabled(true); setStatusBasedReindexingDisabled(true);
@ -2181,6 +2217,107 @@ public class DaoConfig {
// ignore // ignore
} }
/**
* If this is enabled (this is the default), this server will attempt to activate and run <b>Bulk Import</b>
* batch jobs. Otherwise, this server will not.
*
* @since 5.5.0
*/
public boolean isEnableTaskBulkImportJobExecution() {
return myEnableTaskBulkImportJobExecution;
}
/**
* If this is enabled (this is the default), this server will attempt to activate and run <b>Bulk Import</b>
* batch jobs. Otherwise, this server will not.
*
* @since 5.5.0
*/
public void setEnableTaskBulkImportJobExecution(boolean theEnableTaskBulkImportJobExecution) {
myEnableTaskBulkImportJobExecution = theEnableTaskBulkImportJobExecution;
}
/**
* If this is enabled (this is the default), this server will attempt to activate and run <b>Bulk Export</b>
* batch jobs. Otherwise, this server will not.
*
* @since 5.5.0
*/
public void setEnableTaskBulkExportJobExecution(boolean theEnableTaskBulkExportJobExecution) {
myEnableTaskBulkExportJobExecution = theEnableTaskBulkExportJobExecution;
}
/**
* If this is enabled (this is the default), this server will attempt to activate and run <b>Bulk Export</b>
* batch jobs. Otherwise, this server will not.
*
* @since 5.5.0
*/
public boolean isEnableTaskBulkExportJobExecution() {
return myEnableTaskBulkExportJobExecution;
}
/**
* If this is enabled (this is the default), this server will attempt to pre-expand any ValueSets that
* have been uploaded and are not yet pre-expanded. Otherwise, this server will not.
*
* @since 5.5.0
*/
public boolean isEnableTaskPreExpandValueSets() {
return myEnableTaskPreExpandValueSets;
}
/**
* If this is enabled (this is the default), this server will attempt to pre-expand any ValueSets that
* have been uploaded and are not yet pre-expanded. Otherwise, this server will not.
*
* @since 5.5.0
*/
public void setEnableTaskPreExpandValueSets(boolean theEnableTaskPreExpandValueSets) {
myEnableTaskPreExpandValueSets = theEnableTaskPreExpandValueSets;
}
/**
* If this is enabled (this is the default), this server will periodically scan for and try to delete
* stale searches in the database. Otherwise, this server will not.
*
* @since 5.5.0
*/
public boolean isEnableTaskStaleSearchCleanup() {
return myEnableTaskStaleSearchCleanup;
}
/**
* If this is enabled (this is the default), this server will periodically scan for and try to delete
* stale searches in the database. Otherwise, this server will not.
*
* @since 5.5.0
*/
public void setEnableTaskStaleSearchCleanup(boolean theEnableTaskStaleSearchCleanup) {
myEnableTaskStaleSearchCleanup = theEnableTaskStaleSearchCleanup;
}
/**
* If this is enabled (this is the default), this server will attempt to run resource reindexing jobs.
* Otherwise, this server will not.
*
* @since 5.5.0
*/
public void setEnableTaskResourceReindexing(boolean theEnableTaskResourceReindexing) {
myEnableTaskResourceReindexing = theEnableTaskResourceReindexing;
}
/**
* If this is enabled (this is the default), this server will attempt to run resource reindexing jobs.
* Otherwise, this server will not.
*
* @since 5.5.0
*/
public boolean isEnableTaskResourceReindexing() {
return myEnableTaskResourceReindexing;
}
public enum StoreMetaSourceInformationEnum { public enum StoreMetaSourceInformationEnum {
NONE(false, false), NONE(false, false),
SOURCE_URI(true, false), SOURCE_URI(true, false),

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.jpa.bulk.export.svc;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
@ -129,6 +130,9 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@Transactional(value = Transactional.TxType.NEVER) @Transactional(value = Transactional.TxType.NEVER)
@Override @Override
public synchronized void buildExportFiles() { public synchronized void buildExportFiles() {
if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) {
return;
}
Optional<BulkExportJobEntity> jobToProcessOpt = myTxTemplate.execute(t -> { Optional<BulkExportJobEntity> jobToProcessOpt = myTxTemplate.execute(t -> {
Pageable page = PageRequest.of(0, 1); Pageable page = PageRequest.of(0, 1);
@ -176,6 +180,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
} }
@Autowired
private DaoConfig myDaoConfig;
/** /**
* This method is called by the scheduler to run a pass of the * This method is called by the scheduler to run a pass of the
@ -184,6 +190,10 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@Transactional(value = Transactional.TxType.NEVER) @Transactional(value = Transactional.TxType.NEVER)
@Override @Override
public void purgeExpiredFiles() { public void purgeExpiredFiles() {
if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) {
return;
}
Optional<BulkExportJobEntity> jobToDelete = myTxTemplate.execute(t -> { Optional<BulkExportJobEntity> jobToDelete = myTxTemplate.execute(t -> {
Pageable page = PageRequest.of(0, 1); Pageable page = PageRequest.of(0, 1);
Slice<BulkExportJobEntity> submittedJobs = myBulkExportJobDao.findByExpiry(page, new Date()); Slice<BulkExportJobEntity> submittedJobs = myBulkExportJobDao.findByExpiry(page, new Date());

View File

@ -20,8 +20,10 @@ package ca.uhn.fhir.jpa.bulk.imprt.svc;
* #L% * #L%
*/ */
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig; import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig; import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.job.BulkImportJobConfig; import ca.uhn.fhir.jpa.bulk.imprt.job.BulkImportJobConfig;
@ -58,14 +60,15 @@ import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Semaphore;
import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class BulkDataImportSvcImpl implements IBulkDataImportSvc { public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportSvcImpl.class); private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportSvcImpl.class);
private final Semaphore myRunningJobSemaphore = new Semaphore(1);
@Autowired @Autowired
private IBulkImportJobDao myJobDao; private IBulkImportJobDao myJobDao;
@Autowired @Autowired
private IBulkImportJobFileDao myJobFileDao; private IBulkImportJobFileDao myJobFileDao;
@Autowired @Autowired
@ -78,6 +81,8 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
@Autowired @Autowired
@Qualifier(BatchJobsConfig.BULK_IMPORT_JOB_NAME) @Qualifier(BatchJobsConfig.BULK_IMPORT_JOB_NAME)
private org.springframework.batch.core.Job myBulkImportJob; private org.springframework.batch.core.Job myBulkImportJob;
@Autowired
private DaoConfig myDaoConfig;
@PostConstruct @PostConstruct
public void start() { public void start() {
@ -155,7 +160,24 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
@Transactional(value = Transactional.TxType.NEVER) @Transactional(value = Transactional.TxType.NEVER)
@Override @Override
public boolean activateNextReadyJob() { public boolean activateNextReadyJob() {
if (!myDaoConfig.isEnableTaskBulkImportJobExecution()) {
Logs.getBatchTroubleshootingLog().trace("Bulk import job execution is not enabled on this server. No action taken.");
return false;
}
if (!myRunningJobSemaphore.tryAcquire()) {
Logs.getBatchTroubleshootingLog().trace("Already have a running batch job, not going to check for more");
return false;
}
try {
return doActivateNextReadyJob();
} finally {
myRunningJobSemaphore.release();
}
}
private boolean doActivateNextReadyJob() {
Optional<BulkImportJobEntity> jobToProcessOpt = Objects.requireNonNull(myTxTemplate.execute(t -> { Optional<BulkImportJobEntity> jobToProcessOpt = Objects.requireNonNull(myTxTemplate.execute(t -> {
Pageable page = PageRequest.of(0, 1); Pageable page = PageRequest.of(0, 1);
Slice<BulkImportJobEntity> submittedJobs = myJobDao.findByStatus(page, BulkImportJobStatusEnum.READY); Slice<BulkImportJobEntity> submittedJobs = myJobDao.findByStatus(page, BulkImportJobStatusEnum.READY);
@ -243,7 +265,7 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
.addString(BulkExportJobConfig.JOB_UUID_PARAMETER, jobId) .addString(BulkExportJobConfig.JOB_UUID_PARAMETER, jobId)
.addLong(BulkImportJobConfig.JOB_PARAM_COMMIT_INTERVAL, (long) batchSize); .addLong(BulkImportJobConfig.JOB_PARAM_COMMIT_INTERVAL, (long) batchSize);
if(isNotBlank(theBulkExportJobEntity.getJobDescription())) { if (isNotBlank(theBulkExportJobEntity.getJobDescription())) {
parameters.addString(BulkExportJobConfig.JOB_DESCRIPTION, theBulkExportJobEntity.getJobDescription()); parameters.addString(BulkExportJobConfig.JOB_DESCRIPTION, theBulkExportJobEntity.getJobDescription());
} }

View File

@ -78,7 +78,7 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
@Transactional(propagation = Propagation.NEVER) @Transactional(propagation = Propagation.NEVER)
@Override @Override
public synchronized void schedulePollForStaleSearches() { public synchronized void schedulePollForStaleSearches() {
if (!myDaoConfig.isSchedulingDisabled()) { if (!myDaoConfig.isSchedulingDisabled() && myDaoConfig.isEnableTaskStaleSearchCleanup()) {
pollForStaleSearchesAndDeleteThem(); pollForStaleSearchesAndDeleteThem();
} }
} }

View File

@ -256,7 +256,7 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
@Override @Override
@Transactional(Transactional.TxType.NEVER) @Transactional(Transactional.TxType.NEVER)
public Integer runReindexingPass() { public Integer runReindexingPass() {
if (myDaoConfig.isSchedulingDisabled()) { if (myDaoConfig.isSchedulingDisabled() || !myDaoConfig.isEnableTaskPreExpandValueSets()) {
return null; return null;
} }
if (myIndexingLock.tryLock()) { if (myIndexingLock.tryLock()) {

View File

@ -1725,6 +1725,9 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
@Override @Override
public synchronized void preExpandDeferredValueSetsToTerminologyTables() { public synchronized void preExpandDeferredValueSetsToTerminologyTables() {
if (!myDaoConfig.isEnableTaskPreExpandValueSets()) {
return;
}
if (isNotSafeToPreExpandValueSets()) { if (isNotSafeToPreExpandValueSets()) {
ourLog.info("Skipping scheduled pre-expansion of ValueSets while deferred entities are being loaded."); ourLog.info("Skipping scheduled pre-expansion of ValueSets while deferred entities are being loaded.");
return; return;

View File

@ -16,7 +16,7 @@
<name>HAPI FHIR JPA Server - Batch Task Processor</name> <name>HAPI FHIR JPA Server - Batch Task Processor</name>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.springframework.batch</groupId> <groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId> <artifactId>spring-batch-core</artifactId>
<version>${spring_batch_version}</version> <version>${spring_batch_version}</version>
@ -59,7 +59,7 @@
<version>${project.version}</version> <version>${project.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>
<pluginManagement> <pluginManagement>

View File

@ -1,22 +0,0 @@
package ca.uhn.fhir.jpa.batch.svc;
/*-
* #%L
* HAPI FHIR JPA Server - Batch Task Processor
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/