Fix Expunge Deleted Resources Race Condition (#4285)
* failing test * fix race condition * change log * refactor * clean up * remove ExpungeOptions hard limit enforcement * nitpick Co-authored-by: nathaniel.doef <nathaniel.doef@smilecdr.com>
This commit is contained in:
parent
9eeb1545a1
commit
028c3ed5c1
|
@ -0,0 +1,6 @@
|
||||||
|
---
|
||||||
|
type: fix
|
||||||
|
issue: 4284
|
||||||
|
jira: SMILE-5196
|
||||||
|
title: "Previously, an `ExecutionException` was thrown when `$expunge` operations were used with a small batch size
|
||||||
|
and a low thread count due to a race condition involving the expunge limit. This has been corrected."
|
|
@ -127,6 +127,10 @@ public class ResourceExpungeService implements IResourceExpungeService {
|
||||||
@Override
|
@Override
|
||||||
@Transactional
|
@Transactional
|
||||||
public List<ResourcePersistentId> findHistoricalVersionsOfNonDeletedResources(String theResourceName, ResourcePersistentId theResourceId, int theRemainingCount) {
|
public List<ResourcePersistentId> findHistoricalVersionsOfNonDeletedResources(String theResourceName, ResourcePersistentId theResourceId, int theRemainingCount) {
|
||||||
|
if(isEmptyQuery(theRemainingCount)){
|
||||||
|
return Collections.EMPTY_LIST;
|
||||||
|
}
|
||||||
|
|
||||||
Pageable page = PageRequest.of(0, theRemainingCount);
|
Pageable page = PageRequest.of(0, theRemainingCount);
|
||||||
|
|
||||||
Slice<Long> ids;
|
Slice<Long> ids;
|
||||||
|
@ -150,6 +154,10 @@ public class ResourceExpungeService implements IResourceExpungeService {
|
||||||
@Override
|
@Override
|
||||||
@Transactional
|
@Transactional
|
||||||
public List<ResourcePersistentId> findHistoricalVersionsOfDeletedResources(String theResourceName, ResourcePersistentId theResourceId, int theRemainingCount) {
|
public List<ResourcePersistentId> findHistoricalVersionsOfDeletedResources(String theResourceName, ResourcePersistentId theResourceId, int theRemainingCount) {
|
||||||
|
if(isEmptyQuery(theRemainingCount)){
|
||||||
|
return Collections.EMPTY_LIST;
|
||||||
|
}
|
||||||
|
|
||||||
Pageable page = PageRequest.of(0, theRemainingCount);
|
Pageable page = PageRequest.of(0, theRemainingCount);
|
||||||
Slice<Long> ids;
|
Slice<Long> ids;
|
||||||
if (theResourceId != null) {
|
if (theResourceId != null) {
|
||||||
|
@ -172,7 +180,7 @@ public class ResourceExpungeService implements IResourceExpungeService {
|
||||||
public void expungeCurrentVersionOfResources(RequestDetails theRequestDetails, List<ResourcePersistentId> theResourceIds, AtomicInteger theRemainingCount) {
|
public void expungeCurrentVersionOfResources(RequestDetails theRequestDetails, List<ResourcePersistentId> theResourceIds, AtomicInteger theRemainingCount) {
|
||||||
for (ResourcePersistentId next : theResourceIds) {
|
for (ResourcePersistentId next : theResourceIds) {
|
||||||
expungeCurrentVersionOfResource(theRequestDetails, next.getIdAsLong(), theRemainingCount);
|
expungeCurrentVersionOfResource(theRequestDetails, next.getIdAsLong(), theRemainingCount);
|
||||||
if (theRemainingCount.get() <= 0) {
|
if (expungeLimitReached(theRemainingCount)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -230,7 +238,7 @@ public class ResourceExpungeService implements IResourceExpungeService {
|
||||||
public void expungeHistoricalVersionsOfIds(RequestDetails theRequestDetails, List<ResourcePersistentId> theResourceIds, AtomicInteger theRemainingCount) {
|
public void expungeHistoricalVersionsOfIds(RequestDetails theRequestDetails, List<ResourcePersistentId> theResourceIds, AtomicInteger theRemainingCount) {
|
||||||
for (ResourcePersistentId next : theResourceIds) {
|
for (ResourcePersistentId next : theResourceIds) {
|
||||||
expungeHistoricalVersionsOfId(theRequestDetails, next.getIdAsLong(), theRemainingCount);
|
expungeHistoricalVersionsOfId(theRequestDetails, next.getIdAsLong(), theRemainingCount);
|
||||||
if (theRemainingCount.get() <= 0) {
|
if (expungeLimitReached(theRemainingCount)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -241,7 +249,7 @@ public class ResourceExpungeService implements IResourceExpungeService {
|
||||||
public void expungeHistoricalVersions(RequestDetails theRequestDetails, List<ResourcePersistentId> theHistoricalIds, AtomicInteger theRemainingCount) {
|
public void expungeHistoricalVersions(RequestDetails theRequestDetails, List<ResourcePersistentId> theHistoricalIds, AtomicInteger theRemainingCount) {
|
||||||
for (ResourcePersistentId next : theHistoricalIds) {
|
for (ResourcePersistentId next : theHistoricalIds) {
|
||||||
expungeHistoricalVersion(theRequestDetails, next.getIdAsLong(), theRemainingCount);
|
expungeHistoricalVersion(theRequestDetails, next.getIdAsLong(), theRemainingCount);
|
||||||
if (theRemainingCount.get() <= 0) {
|
if (expungeLimitReached(theRemainingCount)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -315,15 +323,21 @@ public class ResourceExpungeService implements IResourceExpungeService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void expungeHistoricalVersionsOfId(RequestDetails theRequestDetails, Long myResourceId, AtomicInteger theRemainingCount) {
|
private void expungeHistoricalVersionsOfId(RequestDetails theRequestDetails, Long myResourceId, AtomicInteger theRemainingCount) {
|
||||||
ResourceTable resource = myResourceTableDao.findById(myResourceId).orElseThrow(IllegalArgumentException::new);
|
Pageable page;
|
||||||
|
synchronized (theRemainingCount){
|
||||||
|
if (expungeLimitReached(theRemainingCount)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
page = PageRequest.of(0, theRemainingCount.get());
|
||||||
|
}
|
||||||
|
|
||||||
Pageable page = PageRequest.of(0, theRemainingCount.get());
|
ResourceTable resource = myResourceTableDao.findById(myResourceId).orElseThrow(IllegalArgumentException::new);
|
||||||
|
|
||||||
Slice<Long> versionIds = myResourceHistoryTableDao.findForResourceId(page, resource.getId(), resource.getVersion());
|
Slice<Long> versionIds = myResourceHistoryTableDao.findForResourceId(page, resource.getId(), resource.getVersion());
|
||||||
ourLog.debug("Found {} versions of resource {} to expunge", versionIds.getNumberOfElements(), resource.getIdDt().getValue());
|
ourLog.debug("Found {} versions of resource {} to expunge", versionIds.getNumberOfElements(), resource.getIdDt().getValue());
|
||||||
for (Long nextVersionId : versionIds) {
|
for (Long nextVersionId : versionIds) {
|
||||||
expungeHistoricalVersion(theRequestDetails, nextVersionId, theRemainingCount);
|
expungeHistoricalVersion(theRequestDetails, nextVersionId, theRemainingCount);
|
||||||
if (theRemainingCount.get() <= 0) {
|
if (expungeLimitReached(theRemainingCount)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -333,4 +347,12 @@ public class ResourceExpungeService implements IResourceExpungeService {
|
||||||
Validate.notNull(myVersion);
|
Validate.notNull(myVersion);
|
||||||
return new SliceImpl<>(Collections.singletonList(myVersion.getId()));
|
return new SliceImpl<>(Collections.singletonList(myVersion.getId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isEmptyQuery(int theCount){
|
||||||
|
return theCount <= 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean expungeLimitReached(AtomicInteger theRemainingCount) {
|
||||||
|
return theRemainingCount.get() <= 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,11 +15,13 @@ import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
|
||||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||||
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
||||||
import ca.uhn.fhir.jpa.model.util.UcumServiceUtil;
|
import ca.uhn.fhir.jpa.model.util.UcumServiceUtil;
|
||||||
|
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||||
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
|
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
|
||||||
import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider;
|
import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider;
|
||||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||||
import ca.uhn.fhir.rest.api.Constants;
|
import ca.uhn.fhir.rest.api.Constants;
|
||||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
import ca.uhn.fhir.rest.param.ReferenceParam;
|
import ca.uhn.fhir.rest.param.ReferenceParam;
|
||||||
import ca.uhn.fhir.rest.param.TokenParam;
|
import ca.uhn.fhir.rest.param.TokenParam;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.MethodNotAllowedException;
|
import ca.uhn.fhir.rest.server.exceptions.MethodNotAllowedException;
|
||||||
|
@ -53,6 +55,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.awaitility.Awaitility.await;
|
import static org.awaitility.Awaitility.await;
|
||||||
|
@ -774,4 +777,66 @@ public class ExpungeR4Test extends BaseResourceProviderR4Test {
|
||||||
// re-enable multi-delete for clean-up
|
// re-enable multi-delete for clean-up
|
||||||
myDaoConfig.setAllowMultipleDelete(true);
|
myDaoConfig.setAllowMultipleDelete(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpungeRaceConditionsWithLowThreadCountAndBatchSize() {
|
||||||
|
final SystemRequestDetails requestDetails = new SystemRequestDetails();
|
||||||
|
final int numPatients = 5;
|
||||||
|
myDaoConfig.setExpungeThreadCount(2);
|
||||||
|
myDaoConfig.setExpungeBatchSize(2);
|
||||||
|
|
||||||
|
List<Patient> patients = createPatientsWithForcedIds(numPatients);
|
||||||
|
patients = updatePatients(patients, 1);
|
||||||
|
deletePatients(patients);
|
||||||
|
|
||||||
|
int expectedPatientHistoryRecords = 15; // 5 resources x 3 versions
|
||||||
|
int actualPatientHistoryRecords = myPatientDao.history(null, null, null, requestDetails).getAllResources().size();
|
||||||
|
assertEquals(expectedPatientHistoryRecords, actualPatientHistoryRecords);
|
||||||
|
|
||||||
|
int expungeLimit = numPatients;
|
||||||
|
ExpungeOptions expungeOptions = new ExpungeOptions()
|
||||||
|
.setLimit(numPatients)
|
||||||
|
.setExpungeDeletedResources(true)
|
||||||
|
.setExpungeOldVersions(true);
|
||||||
|
|
||||||
|
myPatientDao.expunge(expungeOptions, requestDetails);
|
||||||
|
|
||||||
|
int maximumRemainingPatientHistoryRecords = expectedPatientHistoryRecords - expungeLimit;
|
||||||
|
int actualRemainingPatientHistoryRecords = myPatientDao.history(null, null, null, requestDetails).getAllResources().size();
|
||||||
|
|
||||||
|
// Note that the limit used in ExpungeOptions is meant to be a rough throttle.
|
||||||
|
// We care that AT LEAST the specified number of resources are expunged and not if the limit is exceeded.
|
||||||
|
assertTrue(actualRemainingPatientHistoryRecords <= maximumRemainingPatientHistoryRecords);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Patient> createPatientsWithForcedIds(int theNumPatients) {
|
||||||
|
RequestDetails requestDetails = new SystemRequestDetails();
|
||||||
|
List<Patient> createdPatients = new ArrayList<>();
|
||||||
|
for(int i = 1; i <= theNumPatients; i++){
|
||||||
|
Patient patient = new Patient();
|
||||||
|
patient.setId("pt-00" + i);
|
||||||
|
patient.getNameFirstRep().addGiven("Patient-"+i);
|
||||||
|
Patient createdPatient = (Patient)myPatientDao.update(patient, requestDetails).getResource();
|
||||||
|
createdPatients.add(createdPatient);
|
||||||
|
}
|
||||||
|
return createdPatients;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Patient> updatePatients(List<Patient> thePatients, int theUpdateNumber) {
|
||||||
|
RequestDetails requestDetails = new SystemRequestDetails();
|
||||||
|
List<Patient> updatedPatients = new ArrayList<>();
|
||||||
|
for(Patient patient : thePatients){
|
||||||
|
patient.getNameFirstRep().addGiven("Update-" + theUpdateNumber);
|
||||||
|
Patient updatedPatient = (Patient)myPatientDao.update(patient, requestDetails).getResource();
|
||||||
|
updatedPatients.add(updatedPatient);
|
||||||
|
}
|
||||||
|
return updatedPatients;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deletePatients(List<Patient> thePatients){
|
||||||
|
RequestDetails requestDetails = new SystemRequestDetails();
|
||||||
|
for(Patient patient : thePatients){
|
||||||
|
myPatientDao.delete(patient.getIdElement(), requestDetails);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue