diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_3_0/4284-expunge-deleted-resources-race-condition.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_3_0/4284-expunge-deleted-resources-race-condition.yaml new file mode 100644 index 00000000000..72b3d06e9a6 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_3_0/4284-expunge-deleted-resources-race-condition.yaml @@ -0,0 +1,6 @@ +--- +type: fix +issue: 4284 +jira: SMILE-5196 +title: "Previously, an `ExecutionException` was thrown when `$expunge` operations were used with a small batch size +and a low thread count due to a race condition involving the expunge limit. This has been corrected." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/expunge/ResourceExpungeService.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/expunge/ResourceExpungeService.java index 99f263ef47f..627606f9da0 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/expunge/ResourceExpungeService.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/expunge/ResourceExpungeService.java @@ -127,6 +127,10 @@ public class ResourceExpungeService implements IResourceExpungeService { @Override @Transactional public List findHistoricalVersionsOfNonDeletedResources(String theResourceName, ResourcePersistentId theResourceId, int theRemainingCount) { + if(isEmptyQuery(theRemainingCount)){ + return Collections.EMPTY_LIST; + } + Pageable page = PageRequest.of(0, theRemainingCount); Slice ids; @@ -150,6 +154,10 @@ public class ResourceExpungeService implements IResourceExpungeService { @Override @Transactional public List findHistoricalVersionsOfDeletedResources(String theResourceName, ResourcePersistentId theResourceId, int theRemainingCount) { + if(isEmptyQuery(theRemainingCount)){ + return Collections.EMPTY_LIST; + } + Pageable page = PageRequest.of(0, theRemainingCount); Slice ids; if (theResourceId != null) { @@ -172,7 +180,7 @@ public class ResourceExpungeService implements IResourceExpungeService { public void expungeCurrentVersionOfResources(RequestDetails theRequestDetails, List theResourceIds, AtomicInteger theRemainingCount) { for (ResourcePersistentId next : theResourceIds) { expungeCurrentVersionOfResource(theRequestDetails, next.getIdAsLong(), theRemainingCount); - if (theRemainingCount.get() <= 0) { + if (expungeLimitReached(theRemainingCount)) { return; } } @@ -230,7 +238,7 @@ public class ResourceExpungeService implements IResourceExpungeService { public void expungeHistoricalVersionsOfIds(RequestDetails theRequestDetails, List theResourceIds, AtomicInteger theRemainingCount) { for (ResourcePersistentId next : theResourceIds) { expungeHistoricalVersionsOfId(theRequestDetails, next.getIdAsLong(), theRemainingCount); - if (theRemainingCount.get() <= 0) { + if (expungeLimitReached(theRemainingCount)) { return; } } @@ -241,7 +249,7 @@ public class ResourceExpungeService implements IResourceExpungeService { public void expungeHistoricalVersions(RequestDetails theRequestDetails, List theHistoricalIds, AtomicInteger theRemainingCount) { for (ResourcePersistentId next : theHistoricalIds) { expungeHistoricalVersion(theRequestDetails, next.getIdAsLong(), theRemainingCount); - if (theRemainingCount.get() <= 0) { + if (expungeLimitReached(theRemainingCount)) { return; } } @@ -315,15 +323,21 @@ public class ResourceExpungeService implements IResourceExpungeService { } private void expungeHistoricalVersionsOfId(RequestDetails theRequestDetails, Long myResourceId, AtomicInteger theRemainingCount) { - ResourceTable resource = myResourceTableDao.findById(myResourceId).orElseThrow(IllegalArgumentException::new); + Pageable page; + synchronized (theRemainingCount){ + if (expungeLimitReached(theRemainingCount)) { + return; + } + page = PageRequest.of(0, theRemainingCount.get()); + } - Pageable page = PageRequest.of(0, theRemainingCount.get()); + ResourceTable resource = myResourceTableDao.findById(myResourceId).orElseThrow(IllegalArgumentException::new); Slice versionIds = myResourceHistoryTableDao.findForResourceId(page, resource.getId(), resource.getVersion()); ourLog.debug("Found {} versions of resource {} to expunge", versionIds.getNumberOfElements(), resource.getIdDt().getValue()); for (Long nextVersionId : versionIds) { expungeHistoricalVersion(theRequestDetails, nextVersionId, theRemainingCount); - if (theRemainingCount.get() <= 0) { + if (expungeLimitReached(theRemainingCount)) { return; } } @@ -333,4 +347,12 @@ public class ResourceExpungeService implements IResourceExpungeService { Validate.notNull(myVersion); return new SliceImpl<>(Collections.singletonList(myVersion.getId())); } + + private boolean isEmptyQuery(int theCount){ + return theCount <= 0; + } + + private boolean expungeLimitReached(AtomicInteger theRemainingCount) { + return theRemainingCount.get() <= 0; + } } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/ExpungeR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/ExpungeR4Test.java index a38aae55975..634b94af773 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/ExpungeR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/ExpungeR4Test.java @@ -15,11 +15,13 @@ import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable; import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.model.util.UcumServiceUtil; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test; import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.server.IBundleProvider; +import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.param.ReferenceParam; import ca.uhn.fhir.rest.param.TokenParam; import ca.uhn.fhir.rest.server.exceptions.MethodNotAllowedException; @@ -53,6 +55,7 @@ import org.springframework.beans.factory.annotation.Autowired; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import static org.awaitility.Awaitility.await; @@ -774,4 +777,66 @@ public class ExpungeR4Test extends BaseResourceProviderR4Test { // re-enable multi-delete for clean-up myDaoConfig.setAllowMultipleDelete(true); } + + @Test + public void testExpungeRaceConditionsWithLowThreadCountAndBatchSize() { + final SystemRequestDetails requestDetails = new SystemRequestDetails(); + final int numPatients = 5; + myDaoConfig.setExpungeThreadCount(2); + myDaoConfig.setExpungeBatchSize(2); + + List patients = createPatientsWithForcedIds(numPatients); + patients = updatePatients(patients, 1); + deletePatients(patients); + + int expectedPatientHistoryRecords = 15; // 5 resources x 3 versions + int actualPatientHistoryRecords = myPatientDao.history(null, null, null, requestDetails).getAllResources().size(); + assertEquals(expectedPatientHistoryRecords, actualPatientHistoryRecords); + + int expungeLimit = numPatients; + ExpungeOptions expungeOptions = new ExpungeOptions() + .setLimit(numPatients) + .setExpungeDeletedResources(true) + .setExpungeOldVersions(true); + + myPatientDao.expunge(expungeOptions, requestDetails); + + int maximumRemainingPatientHistoryRecords = expectedPatientHistoryRecords - expungeLimit; + int actualRemainingPatientHistoryRecords = myPatientDao.history(null, null, null, requestDetails).getAllResources().size(); + + // Note that the limit used in ExpungeOptions is meant to be a rough throttle. + // We care that AT LEAST the specified number of resources are expunged and not if the limit is exceeded. + assertTrue(actualRemainingPatientHistoryRecords <= maximumRemainingPatientHistoryRecords); + } + + private List createPatientsWithForcedIds(int theNumPatients) { + RequestDetails requestDetails = new SystemRequestDetails(); + List createdPatients = new ArrayList<>(); + for(int i = 1; i <= theNumPatients; i++){ + Patient patient = new Patient(); + patient.setId("pt-00" + i); + patient.getNameFirstRep().addGiven("Patient-"+i); + Patient createdPatient = (Patient)myPatientDao.update(patient, requestDetails).getResource(); + createdPatients.add(createdPatient); + } + return createdPatients; + } + + private List updatePatients(List thePatients, int theUpdateNumber) { + RequestDetails requestDetails = new SystemRequestDetails(); + List updatedPatients = new ArrayList<>(); + for(Patient patient : thePatients){ + patient.getNameFirstRep().addGiven("Update-" + theUpdateNumber); + Patient updatedPatient = (Patient)myPatientDao.update(patient, requestDetails).getResource(); + updatedPatients.add(updatedPatient); + } + return updatedPatients; + } + + private void deletePatients(List thePatients){ + RequestDetails requestDetails = new SystemRequestDetails(); + for(Patient patient : thePatients){ + myPatientDao.delete(patient.getIdElement(), requestDetails); + } + } }