3558 bulk export premature deletion (#3562)

* Implementation, parameterize test, changelog

* Bump spring framework for a vuln while i'm in here

* Fix number of changelog

* Add IS NOT NULL to query
This commit is contained in:
Tadgh 2022-04-23 18:46:44 -07:00 committed by GitHub
parent 4c9667c92a
commit f7a31287b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 188 additions and 24 deletions

View File

@ -0,0 +1,5 @@
type: fix
issue: 3561
jira: SMILE-3728
title: "Previously, Bulk export jobs automatically cleared the collection after a hardcoded 2 hour time period from start of the job. This is now configurable via a new DaoConfig property,
`setBulkExportFileRetentionPeriodHours()`. If this is set to a value of 0 or below, then the collections will never be expired."

View File

@ -200,7 +200,7 @@ public class BulkDataExportJobSchedulingHelperImpl implements IBulkDataExportJob
Optional<BulkExportJobEntity> jobToDelete = myTxTemplate.execute(t -> {
Pageable page = PageRequest.of(0, 1);
Slice<BulkExportJobEntity> submittedJobs = myBulkExportJobDao.findByExpiry(page, new Date());
Slice<BulkExportJobEntity> submittedJobs = myBulkExportJobDao.findNotRunningByExpiry(page, new Date());
if (submittedJobs.isEmpty()) {
return Optional.empty();
}

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
@ -78,10 +79,11 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
private DaoRegistry myDaoRegistry;
@Autowired
private FhirContext myContext;
@Autowired
private DaoConfig myDaoConfig;
private Set<String> myCompartmentResources;
private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
@Transactional
@Override
@ -233,8 +235,18 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
.setStatus(theJob.getStatus());
}
/**
* If the retention period is set to <= 0, set it to null, which prevents it from getting expired, otherwise, set
* the retention period.
*
* @param theJob the job to update the expiry for.
*/
private void updateExpiry(BulkExportJobEntity theJob) {
theJob.setExpiry(DateUtils.addMilliseconds(new Date(), myRetentionPeriod));
if (myDaoConfig.getBulkExportFileRetentionPeriodHours() > 0) {
theJob.setExpiry(DateUtils.addHours(new Date(), myDaoConfig.getBulkExportFileRetentionPeriodHours()));
} else {
theJob.setExpiry(null);
}
}
@Transactional

View File

@ -43,6 +43,9 @@ public interface IBulkExportJobDao extends JpaRepository<BulkExportJobEntity, Lo
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myExpiry < :cutoff")
Slice<BulkExportJobEntity> findByExpiry(Pageable thePage, @Param("cutoff") Date theCutoff);
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myExpiry IS NOT NULL and j.myExpiry < :cutoff AND j.myStatus <> 'BUILDING'")
Slice<BulkExportJobEntity> findNotRunningByExpiry(Pageable thePage, @Param("cutoff") Date theCutoff);
@Query("SELECT j FROM BulkExportJobEntity j WHERE j.myRequest = :request AND j.myCreated > :createdAfter AND j.myStatus <> :status ORDER BY j.myCreated DESC")
Slice<BulkExportJobEntity> findExistingJob(Pageable thePage, @Param("request") String theRequest, @Param("createdAfter") Date theCreatedAfter, @Param("status") BulkExportJobStatusEnum theNotStatus);

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.dao.search;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.search.builder.ISearchQueryExecutor;
import org.hibernate.search.engine.backend.common.DocumentReference;
import org.hibernate.search.engine.search.query.SearchScroll;

View File

@ -78,7 +78,7 @@ public class BulkExportJobEntity implements Serializable {
@Column(name = "STATUS_TIME", nullable = false)
private Date myStatusTime;
@Temporal(TemporalType.TIMESTAMP)
@Column(name = "EXP_TIME", nullable = false)
@Column(name = "EXP_TIME", nullable = true)
private Date myExpiry;
@Column(name = "REQUEST", nullable = false, length = REQUEST_LENGTH)
private String myRequest;
@ -146,7 +146,7 @@ public class BulkExportJobEntity implements Serializable {
if (myStatus != null) {
b.append("status", myStatus + " " + new InstantType(myStatusTime).getValueAsString());
}
b.append("created", new InstantType(myExpiry).getValueAsString());
b.append("created", new InstantType(myCreated).getValueAsString());
b.append("expiry", new InstantType(myExpiry).getValueAsString());
b.append("request", myRequest);
b.append("since", mySince);

View File

@ -283,6 +283,11 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
.addColumn("20220416.1", "CUR_GATED_STEP_ID")
.nullable()
.type(ColumnTypeEnum.STRING, 100);
//Make Job expiry nullable so that we can prevent job expiry by using a null value.
version
.onTable("HFJ_BLK_EXPORT_JOB").modifyColumn("20220423.1", "EXP_TIME").nullable().withType(ColumnTypeEnum.DATE_TIMESTAMP);
}
/**

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.search.builder;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.apache.commons.lang3.Validate;

View File

@ -54,6 +54,9 @@ import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Practitioner;
import org.hl7.fhir.r4.model.Reference;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -77,6 +80,7 @@ import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -130,10 +134,40 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
}
@Test
public void testPurgeExpiredJobs() {
// Create an expired job
/**
* Returns parameters in format of:
*
* 1. Bulk Job status
* 2. Expiry Date that should be set on the job
* 3. How many jobs should be left after running a purge pass.
*/
static Stream<Arguments> bulkExpiryStatusProvider() {
Date previousTime = DateUtils.addHours(new Date(), -1);
Date futureTime = DateUtils.addHours(new Date(), 50);
return Stream.of(
//Finished jobs with standard expiries.
Arguments.of(BulkExportJobStatusEnum.COMPLETE, previousTime, 0),
Arguments.of(BulkExportJobStatusEnum.COMPLETE, futureTime, 1),
//Finished job with null expiry
Arguments.of(BulkExportJobStatusEnum.COMPLETE, null, 1),
//Expired errored job.
Arguments.of(BulkExportJobStatusEnum.ERROR, previousTime, 0),
//Unexpired errored job.
Arguments.of(BulkExportJobStatusEnum.ERROR, futureTime, 1),
//Expired job but currently still running.
Arguments.of(BulkExportJobStatusEnum.BUILDING, previousTime, 1)
);
}
@ParameterizedTest
@MethodSource("bulkExpiryStatusProvider")
public void testBulkExportExpiryRules(BulkExportJobStatusEnum theStatus, Date theExpiry, int theExpectedCountAfterPurge) {
runInTransaction(() -> {
Binary b = new Binary();
@ -141,8 +175,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
String binaryId = myBinaryDao.create(b, new SystemRequestDetails()).getId().toUnqualifiedVersionless().getValue();
BulkExportJobEntity job = new BulkExportJobEntity();
job.setStatus(BulkExportJobStatusEnum.COMPLETE);
job.setExpiry(DateUtils.addHours(new Date(), -1));
job.setStatus(theStatus);
job.setExpiry(theExpiry);
job.setJobId(UUID.randomUUID().toString());
job.setCreated(new Date());
job.setRequest("$export");
@ -159,7 +193,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
file.setCollection(collection);
file.setResource(binaryId);
myBulkExportCollectionFileDao.save(file);
});
// Check that things were created
@ -173,14 +206,13 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Run a purge pass
myBulkDataExportJobSchedulingHelper.purgeExpiredFiles();
// Check that things were deleted
// Check for correct remaining resources based on inputted rules from method provider.
runInTransaction(() -> {
assertEquals(0, myResourceTableDao.count());
assertThat(myBulkExportJobDao.findAll(), Matchers.empty());
assertEquals(0, myBulkExportCollectionDao.count());
assertEquals(0, myBulkExportCollectionFileDao.count());
assertEquals(theExpectedCountAfterPurge, myResourceTableDao.count());
assertEquals(theExpectedCountAfterPurge, myBulkExportJobDao.findAll().size());
assertEquals(theExpectedCountAfterPurge, myBulkExportCollectionDao.count());
assertEquals(theExpectedCountAfterPurge, myBulkExportCollectionFileDao.count());
});
}
@Test

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.batch2.api;
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.model.api.IModelJson;
public interface IJobCompletionHandler<PT extends IModelJson> {

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum;
import ca.uhn.fhir.jpa.api.model.WarmCacheEntry;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.entity.ResourceEncodingEnum;
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
import ca.uhn.fhir.util.HapiExtensions;
import ca.uhn.fhir.validation.FhirValidator;
@ -308,6 +309,11 @@ public class DaoConfig {
*/
private long myAutoInflateBinariesMaximumBytes = 10 * FileUtils.ONE_MB;
/**
* Since 6.0.0
*/
private int myBulkExportFileRetentionPeriodHours = 2;
/**
* Constructor
*/
@ -2858,7 +2864,28 @@ public class DaoConfig {
return myAutoInflateBinariesMaximumBytes;
}
public enum StoreMetaSourceInformationEnum {
/**
* This setting controls how long Bulk Export collection entities will be retained after job start.
* Default is 2 hours. Setting this value to 0 or less will cause Bulk Export collection entities to never be expired.
*
* @since 6.0.0
*/
public int getBulkExportFileRetentionPeriodHours() {
return myBulkExportFileRetentionPeriodHours;
}
/**
* This setting controls how long Bulk Export collection entities will be retained after job start.
* Default is 2 hours. Setting this value to 0 or less will cause Bulk Export collection entities to never be expired.
*
* @since 6.0.0
*/
public void setBulkExportFileRetentionPeriodHours(int theBulkExportFileRetentionPeriodHours) {
myBulkExportFileRetentionPeriodHours = theBulkExportFileRetentionPeriodHours;
}
public enum StoreMetaSourceInformationEnum {
NONE(false, false),
SOURCE_URI(true, false),
REQUEST_ID(false, true),
@ -2948,7 +2975,7 @@ public class DaoConfig {
NON_VERSIONED,
/**
* Tags are stored directly in the resource body (in the {@link ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable}
* Tags are stored directly in the resource body (in the {@link ResourceHistoryTable}
* entry for the resource, meaning that they are not indexed separately, and are versioned with the rest
* of the resource.
*/

View File

@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.dao.expunge;
/*-
* #%L
* HAPI FHIR JPA Server
* HAPI FHIR Storage api
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%

View File

@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.dao.expunge;
/*-
* #%L
* HAPI FHIR JPA Server
* HAPI FHIR Storage api
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.dao.expunge;
/*-
* #%L
* HAPI FHIR Storage api
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.rest.api.server.RequestDetails;
import javax.annotation.Nullable;

View File

@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.dao.expunge;
/*-
* #%L
* HAPI FHIR JPA Server
* HAPI FHIR Storage api
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%

View File

@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.dao.expunge;
/*-
* #%L
* HAPI FHIR JPA Server
* HAPI FHIR Storage api
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%

View File

@ -822,7 +822,7 @@
<swagger_version>2.1.12</swagger_version>
<slf4j_version>1.7.33</slf4j_version>
<log4j_to_slf4j_version>2.17.1</log4j_to_slf4j_version>
<spring_version>5.3.18</spring_version>
<spring_version>5.3.19</spring_version>
<spring_data_version>2.6.1</spring_data_version>
<spring_batch_version>4.3.3</spring_batch_version>
<spring_boot_version>2.6.2</spring_boot_version>