divide delete expunge into multiple transactions to prevent deadlocks
This commit is contained in:
parent
d8298e38d4
commit
cca241868d
|
@ -55,8 +55,10 @@ public class Batch2SupportConfig {
|
|||
public IDeleteExpungeSvc deleteExpungeSvc(
|
||||
EntityManager theEntityManager,
|
||||
DeleteExpungeSqlBuilder theDeleteExpungeSqlBuilder,
|
||||
@Autowired(required = false) IFulltextSearchSvc theFullTextSearchSvc) {
|
||||
return new DeleteExpungeSvcImpl(theEntityManager, theDeleteExpungeSqlBuilder, theFullTextSearchSvc);
|
||||
@Autowired(required = false) IFulltextSearchSvc theFullTextSearchSvc,
|
||||
IHapiTransactionService theTransactionService) {
|
||||
return new DeleteExpungeSvcImpl(
|
||||
theEntityManager, theDeleteExpungeSqlBuilder, theFullTextSearchSvc, theTransactionService);
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
|
|
@ -58,24 +58,24 @@ public class DeleteExpungeSqlBuilder {
|
|||
|
||||
@Nonnull
|
||||
DeleteExpungeSqlResult convertPidsToDeleteExpungeSql(
|
||||
List<JpaPid> theJpaPids, boolean theCascade, Integer theCascadeMaxRounds) {
|
||||
Set<Long> pids, boolean theCascade, Integer theCascadeMaxRounds) {
|
||||
|
||||
Set<Long> pids = JpaPid.toLongSet(theJpaPids);
|
||||
validateOkToDeleteAndExpunge(pids, theCascade, theCascadeMaxRounds);
|
||||
|
||||
List<String> rawSql = new ArrayList<>();
|
||||
List<String> rawSqlToDeleteReferences = new ArrayList<>();
|
||||
|
||||
String pidListString = pids.toString().replace("[", "(").replace("]", ")");
|
||||
List<ResourceForeignKey> resourceForeignKeys = myResourceTableFKProvider.getResourceForeignKeys();
|
||||
|
||||
for (ResourceForeignKey resourceForeignKey : resourceForeignKeys) {
|
||||
rawSql.add(deleteRecordsByColumnSql(pidListString, resourceForeignKey));
|
||||
rawSqlToDeleteReferences.add(deleteRecordsByColumnSql(pidListString, resourceForeignKey));
|
||||
}
|
||||
|
||||
// Lastly we need to delete records from the resource table all of these other tables link to:
|
||||
ResourceForeignKey resourceTablePk = new ResourceForeignKey("HFJ_RESOURCE", "RES_ID");
|
||||
rawSql.add(deleteRecordsByColumnSql(pidListString, resourceTablePk));
|
||||
return new DeleteExpungeSqlResult(rawSql, pids.size());
|
||||
List<String> rawSqlToDeleteResources =
|
||||
Collections.singletonList(deleteRecordsByColumnSql(pidListString, resourceTablePk));
|
||||
|
||||
return new DeleteExpungeSqlResult(rawSqlToDeleteReferences, rawSqlToDeleteResources, pids.size());
|
||||
}
|
||||
|
||||
public void validateOkToDeleteAndExpunge(Set<Long> thePids, boolean theCascade, Integer theCascadeMaxRounds) {
|
||||
|
@ -179,21 +179,37 @@ public class DeleteExpungeSqlBuilder {
|
|||
}
|
||||
|
||||
public static class DeleteExpungeSqlResult {
|
||||
private List<String> mySqlStatementsToDeleteReferences;
|
||||
private List<String> mySqlStatementsToDeleteResources;
|
||||
private int myRecordCount;
|
||||
|
||||
private final List<String> mySqlStatements;
|
||||
private final int myRecordCount;
|
||||
public DeleteExpungeSqlResult() {}
|
||||
|
||||
public DeleteExpungeSqlResult(List<String> theSqlStatments, int theRecordCount) {
|
||||
mySqlStatements = theSqlStatments;
|
||||
public DeleteExpungeSqlResult(
|
||||
List<String> theSqlStatementsToDeleteReferences,
|
||||
List<String> theSqlStatementsToDeleteResources,
|
||||
int theRecordCount) {
|
||||
mySqlStatementsToDeleteReferences = theSqlStatementsToDeleteReferences;
|
||||
mySqlStatementsToDeleteResources = theSqlStatementsToDeleteResources;
|
||||
myRecordCount = theRecordCount;
|
||||
}
|
||||
|
||||
public List<String> getSqlStatements() {
|
||||
return mySqlStatements;
|
||||
public List<String> getSqlStatementsToDeleteReferences() {
|
||||
return mySqlStatementsToDeleteReferences;
|
||||
}
|
||||
|
||||
public List<String> getSqlStatementsToDeleteResources() {
|
||||
return mySqlStatementsToDeleteResources;
|
||||
}
|
||||
|
||||
public int getRecordCount() {
|
||||
return myRecordCount;
|
||||
}
|
||||
|
||||
public void setFieldsFromOther(DeleteExpungeSqlResult theOtherResult) {
|
||||
this.mySqlStatementsToDeleteReferences = theOtherResult.mySqlStatementsToDeleteReferences;
|
||||
this.mySqlStatementsToDeleteResources = theOtherResult.mySqlStatementsToDeleteResources;
|
||||
this.myRecordCount = theOtherResult.myRecordCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,19 +21,26 @@ package ca.uhn.fhir.jpa.delete.batch2;
|
|||
|
||||
import ca.uhn.fhir.jpa.api.svc.IDeleteExpungeSvc;
|
||||
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
|
||||
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
|
||||
import ca.uhn.fhir.jpa.model.dao.JpaPid;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||
import jakarta.persistence.EntityManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class DeleteExpungeSvcImpl implements IDeleteExpungeSvc<JpaPid> {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(DeleteExpungeSvcImpl.class);
|
||||
|
||||
private final IHapiTransactionService myHapiTransactionService;
|
||||
private final EntityManager myEntityManager;
|
||||
private final DeleteExpungeSqlBuilder myDeleteExpungeSqlBuilder;
|
||||
private final IFulltextSearchSvc myFullTextSearchSvc;
|
||||
|
@ -41,30 +48,74 @@ public class DeleteExpungeSvcImpl implements IDeleteExpungeSvc<JpaPid> {
|
|||
public DeleteExpungeSvcImpl(
|
||||
EntityManager theEntityManager,
|
||||
DeleteExpungeSqlBuilder theDeleteExpungeSqlBuilder,
|
||||
@Autowired(required = false) IFulltextSearchSvc theFullTextSearchSvc) {
|
||||
@Autowired(required = false) IFulltextSearchSvc theFullTextSearchSvc,
|
||||
IHapiTransactionService theHapiTransactionService) {
|
||||
myEntityManager = theEntityManager;
|
||||
myDeleteExpungeSqlBuilder = theDeleteExpungeSqlBuilder;
|
||||
myFullTextSearchSvc = theFullTextSearchSvc;
|
||||
myHapiTransactionService = theHapiTransactionService;
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public int deleteExpungeSingleResource(JpaPid theJpaPid, boolean theCascade, Integer theCascadeMaxRounds) {
|
||||
Long pid = theJpaPid.getId();
|
||||
|
||||
DeleteExpungeSqlBuilder.DeleteExpungeSqlResult sqlResult =
|
||||
myDeleteExpungeSqlBuilder.convertPidsToDeleteExpungeSql(
|
||||
Collections.singleton(pid), theCascade, theCascadeMaxRounds);
|
||||
|
||||
executeSqlList(sqlResult.getSqlStatementsToDeleteReferences());
|
||||
executeSqlList(sqlResult.getSqlStatementsToDeleteResources());
|
||||
|
||||
clearHibernateSearchIndex(Collections.singletonList(theJpaPid));
|
||||
return sqlResult.getRecordCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteExpunge(List<JpaPid> theJpaPids, boolean theCascade, Integer theCascadeMaxRounds) {
|
||||
DeleteExpungeSqlBuilder.DeleteExpungeSqlResult sqlResult =
|
||||
myDeleteExpungeSqlBuilder.convertPidsToDeleteExpungeSql(theJpaPids, theCascade, theCascadeMaxRounds);
|
||||
List<String> sqlList = sqlResult.getSqlStatements();
|
||||
public int deleteExpungeBatch(
|
||||
List<JpaPid> theJpaPids,
|
||||
boolean theCascade,
|
||||
Integer theCascadeMaxRounds,
|
||||
RequestDetails theRequestDetails) {
|
||||
|
||||
// assert there is no active transaction
|
||||
assert !TransactionSynchronizationManager.isActualTransactionActive();
|
||||
Set<Long> pids = JpaPid.toLongSet(theJpaPids);
|
||||
|
||||
DeleteExpungeSqlBuilder.DeleteExpungeSqlResult sqlResultOutsideLambda =
|
||||
new DeleteExpungeSqlBuilder.DeleteExpungeSqlResult();
|
||||
|
||||
myHapiTransactionService
|
||||
.withRequest(theRequestDetails)
|
||||
.withTransactionDetails(new TransactionDetails())
|
||||
.execute(() -> {
|
||||
DeleteExpungeSqlBuilder.DeleteExpungeSqlResult sqlResult =
|
||||
myDeleteExpungeSqlBuilder.convertPidsToDeleteExpungeSql(
|
||||
pids, theCascade, theCascadeMaxRounds);
|
||||
executeSqlList(sqlResult.getSqlStatementsToDeleteReferences());
|
||||
sqlResultOutsideLambda.setFieldsFromOther(sqlResult);
|
||||
});
|
||||
|
||||
myHapiTransactionService
|
||||
.withRequest(theRequestDetails)
|
||||
.withTransactionDetails(new TransactionDetails())
|
||||
.execute(() -> {
|
||||
executeSqlList(sqlResultOutsideLambda.getSqlStatementsToDeleteResources());
|
||||
clearHibernateSearchIndex(theJpaPids);
|
||||
});
|
||||
|
||||
return sqlResultOutsideLambda.getRecordCount();
|
||||
}
|
||||
|
||||
private int executeSqlList(List<String> sqlList) {
|
||||
int totalDeleted = 0;
|
||||
|
||||
ourLog.debug("Executing {} delete expunge sql commands", sqlList.size());
|
||||
long totalDeleted = 0;
|
||||
for (String sql : sqlList) {
|
||||
ourLog.trace("Executing sql " + sql);
|
||||
totalDeleted += myEntityManager.createNativeQuery(sql).executeUpdate();
|
||||
}
|
||||
|
||||
ourLog.info("{} records deleted", totalDeleted);
|
||||
clearHibernateSearchIndex(theJpaPids);
|
||||
|
||||
// TODO KHS instead of logging progress, produce result chunks that get aggregated into a delete expunge report
|
||||
return sqlResult.getRecordCount();
|
||||
return totalDeleted;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package ca.uhn.fhir.mdm.batch2.clear;
|
||||
|
||||
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||
|
@ -10,15 +11,13 @@ import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
|||
import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum;
|
||||
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
|
||||
import ca.uhn.fhir.model.primitive.IdDt;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
||||
import org.hl7.fhir.r4.model.Patient;
|
||||
import org.hl7.fhir.r4.model.Reference;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import jakarta.annotation.Nonnull;
|
||||
|
@ -110,11 +109,9 @@ class MdmClearStepTest extends BaseMdmR4Test {
|
|||
ResourceIdListWorkChunkJson chunk = new ResourceIdListWorkChunkJson();
|
||||
chunk.addTypedPid("Patient", myGoldenPid);
|
||||
|
||||
RequestDetails requestDetails = new SystemRequestDetails();
|
||||
TransactionDetails transactionDetails = new TransactionDetails();
|
||||
StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = buildStepExecutionDetails(chunk);
|
||||
|
||||
myMdmClearStep.myHapiTransactionService.execute(requestDetails, transactionDetails, myMdmClearStep.buildJob(requestDetails, transactionDetails, stepExecutionDetails));
|
||||
myMdmClearStep.run(stepExecutionDetails, Mockito.mock(IJobDataSink.class));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
|
|
|
@ -863,7 +863,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
|
|||
|
||||
// Test
|
||||
myCaptureQueriesListener.clear();
|
||||
RunOutcome outcome = myDeleteExpungeStep.doDeleteExpunge(new ResourceIdListWorkChunkJson(pids, null), sink, "instance-id", "chunk-id", false, null);
|
||||
RunOutcome outcome = myDeleteExpungeStep.doDeleteExpunge(new ResourceIdListWorkChunkJson(pids, null), "instance-id", "chunk-id", false, null);
|
||||
|
||||
// Verify
|
||||
assertEquals(1, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
|
||||
|
|
|
@ -304,7 +304,7 @@ public class MdmStorageInterceptor implements IMdmStorageInterceptor {
|
|||
|
||||
private int deleteExpungeGoldenResource(IResourcePersistentId theGoldenPid) {
|
||||
IDeleteExpungeSvc deleteExpungeSvc = myIMdmClearHelperSvc.getDeleteExpungeSvc();
|
||||
return deleteExpungeSvc.deleteExpunge(new ArrayList<>(Collections.singleton(theGoldenPid)), false, null);
|
||||
return deleteExpungeSvc.deleteExpungeSingleResource(theGoldenPid, false, null);
|
||||
}
|
||||
|
||||
private void forbidIfModifyingExternalEidOnTarget(IBaseResource theNewResource, IBaseResource theOldResource) {
|
||||
|
|
|
@ -67,9 +67,7 @@ public class DeleteExpungeAppCtx {
|
|||
ResourceIdListWorkChunkJson.class,
|
||||
new LoadIdsStep(theBatch2DaoSvc))
|
||||
.addLastStep(
|
||||
"expunge",
|
||||
"Perform the resource expunge",
|
||||
expungeStep(theHapiTransactionService, theDeleteExpungeSvc, theIdHelperService))
|
||||
"expunge", "Perform the resource expunge", expungeStep(theDeleteExpungeSvc, theIdHelperService))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -85,11 +83,8 @@ public class DeleteExpungeAppCtx {
|
|||
}
|
||||
|
||||
@Bean
|
||||
public DeleteExpungeStep expungeStep(
|
||||
HapiTransactionService theHapiTransactionService,
|
||||
IDeleteExpungeSvc theDeleteExpungeSvc,
|
||||
IIdHelperService theIdHelperService) {
|
||||
return new DeleteExpungeStep(theHapiTransactionService, theDeleteExpungeSvc, theIdHelperService);
|
||||
public DeleteExpungeStep expungeStep(IDeleteExpungeSvc theDeleteExpungeSvc, IIdHelperService theIdHelperService) {
|
||||
return new DeleteExpungeStep(theDeleteExpungeSvc, theIdHelperService);
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
|
|
@ -19,20 +19,21 @@
|
|||
*/
|
||||
package ca.uhn.fhir.batch2.jobs.expunge;
|
||||
|
||||
import ca.uhn.fhir.batch2.api.*;
|
||||
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
||||
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
|
||||
import ca.uhn.fhir.batch2.api.RunOutcome;
|
||||
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||
import ca.uhn.fhir.batch2.api.VoidModel;
|
||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||
import ca.uhn.fhir.jpa.api.svc.IDeleteExpungeSvc;
|
||||
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||
import ca.uhn.fhir.jpa.model.dao.JpaPid;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||
import jakarta.annotation.Nonnull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallback;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
@ -40,15 +41,10 @@ public class DeleteExpungeStep
|
|||
implements IJobStepWorker<DeleteExpungeJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
|
||||
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(DeleteExpungeStep.class);
|
||||
private final HapiTransactionService myHapiTransactionService;
|
||||
private final IDeleteExpungeSvc myDeleteExpungeSvc;
|
||||
private final IIdHelperService myIdHelperService;
|
||||
|
||||
public DeleteExpungeStep(
|
||||
HapiTransactionService theHapiTransactionService,
|
||||
IDeleteExpungeSvc theDeleteExpungeSvc,
|
||||
IIdHelperService theIdHelperService) {
|
||||
myHapiTransactionService = theHapiTransactionService;
|
||||
public DeleteExpungeStep(IDeleteExpungeSvc theDeleteExpungeSvc, IIdHelperService theIdHelperService) {
|
||||
myDeleteExpungeSvc = theDeleteExpungeSvc;
|
||||
myIdHelperService = theIdHelperService;
|
||||
}
|
||||
|
@ -68,7 +64,6 @@ public class DeleteExpungeStep
|
|||
Integer cascadeMaxRounds = theStepExecutionDetails.getParameters().getCascadeMaxRounds();
|
||||
return doDeleteExpunge(
|
||||
data,
|
||||
theDataSink,
|
||||
theStepExecutionDetails.getInstance().getInstanceId(),
|
||||
theStepExecutionDetails.getChunkId(),
|
||||
cascade,
|
||||
|
@ -78,67 +73,50 @@ public class DeleteExpungeStep
|
|||
@Nonnull
|
||||
public RunOutcome doDeleteExpunge(
|
||||
ResourceIdListWorkChunkJson theData,
|
||||
IJobDataSink<VoidModel> theDataSink,
|
||||
String theInstanceId,
|
||||
String theChunkId,
|
||||
boolean theCascade,
|
||||
Integer theCascadeMaxRounds) {
|
||||
RequestDetails requestDetails = new SystemRequestDetails();
|
||||
TransactionDetails transactionDetails = new TransactionDetails();
|
||||
SystemRequestDetails requestDetails = new SystemRequestDetails();
|
||||
requestDetails.setRequestPartitionId(theData.getRequestPartitionId());
|
||||
|
||||
DeleteExpungeJob job = new DeleteExpungeJob(
|
||||
theData,
|
||||
requestDetails,
|
||||
transactionDetails,
|
||||
theDataSink,
|
||||
theInstanceId,
|
||||
theChunkId,
|
||||
theCascade,
|
||||
theCascadeMaxRounds);
|
||||
myHapiTransactionService
|
||||
.withRequest(requestDetails)
|
||||
.withTransactionDetails(transactionDetails)
|
||||
.withRequestPartitionId(theData.getRequestPartitionId())
|
||||
.execute(job);
|
||||
theData, theInstanceId, theChunkId, theCascade, theCascadeMaxRounds, requestDetails);
|
||||
|
||||
job.executeJob();
|
||||
|
||||
return new RunOutcome(job.getRecordCount());
|
||||
}
|
||||
|
||||
private class DeleteExpungeJob implements TransactionCallback<Void> {
|
||||
private class DeleteExpungeJob {
|
||||
private final ResourceIdListWorkChunkJson myData;
|
||||
private final RequestDetails myRequestDetails;
|
||||
private final TransactionDetails myTransactionDetails;
|
||||
private final IJobDataSink<VoidModel> myDataSink;
|
||||
private final String myChunkId;
|
||||
private final String myInstanceId;
|
||||
private final boolean myCascade;
|
||||
private final Integer myCascadeMaxRounds;
|
||||
private int myRecordCount;
|
||||
private final RequestDetails myRequestDetails;
|
||||
|
||||
public DeleteExpungeJob(
|
||||
ResourceIdListWorkChunkJson theData,
|
||||
RequestDetails theRequestDetails,
|
||||
TransactionDetails theTransactionDetails,
|
||||
IJobDataSink<VoidModel> theDataSink,
|
||||
String theInstanceId,
|
||||
String theChunkId,
|
||||
boolean theCascade,
|
||||
Integer theCascadeMaxRounds) {
|
||||
Integer theCascadeMaxRounds,
|
||||
RequestDetails theRequestDetails) {
|
||||
myData = theData;
|
||||
myRequestDetails = theRequestDetails;
|
||||
myTransactionDetails = theTransactionDetails;
|
||||
myDataSink = theDataSink;
|
||||
myInstanceId = theInstanceId;
|
||||
myChunkId = theChunkId;
|
||||
myCascade = theCascade;
|
||||
myCascadeMaxRounds = theCascadeMaxRounds;
|
||||
myRequestDetails = theRequestDetails;
|
||||
}
|
||||
|
||||
public int getRecordCount() {
|
||||
return myRecordCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void doInTransaction(@Nonnull TransactionStatus theStatus) {
|
||||
public void executeJob() {
|
||||
|
||||
List<JpaPid> persistentIds = myData.getResourcePersistentIds(myIdHelperService);
|
||||
|
||||
|
@ -147,7 +125,6 @@ public class DeleteExpungeStep
|
|||
"Starting delete expunge work chunk. There are no resources to delete expunge - Instance[{}] Chunk[{}]",
|
||||
myInstanceId,
|
||||
myChunkId);
|
||||
return null;
|
||||
}
|
||||
|
||||
ourLog.info(
|
||||
|
@ -156,9 +133,10 @@ public class DeleteExpungeStep
|
|||
myInstanceId,
|
||||
myChunkId);
|
||||
|
||||
myRecordCount = myDeleteExpungeSvc.deleteExpunge(persistentIds, myCascade, myCascadeMaxRounds);
|
||||
|
||||
return null;
|
||||
// not creating a transaction here, as deleteExpungeBatch manages
|
||||
// its transactions internally to prevent deadlocks
|
||||
myRecordCount = myDeleteExpungeSvc.deleteExpungeBatch(
|
||||
persistentIds, myCascade, myCascadeMaxRounds, myRequestDetails);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,18 +32,14 @@ import ca.uhn.fhir.jpa.api.svc.IMdmClearHelperSvc;
|
|||
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||
import ca.uhn.fhir.mdm.dao.IMdmLinkDao;
|
||||
import ca.uhn.fhir.mdm.interceptor.MdmStorageInterceptor;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||
import ca.uhn.fhir.util.StopWatch;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import jakarta.annotation.Nonnull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallback;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -72,100 +68,73 @@ public class MdmClearStep implements IJobStepWorker<MdmClearJobParameters, Resou
|
|||
@Nonnull StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails,
|
||||
@Nonnull IJobDataSink<VoidModel> theDataSink)
|
||||
throws JobExecutionFailedException {
|
||||
|
||||
try {
|
||||
// avoid double deletion of mdm links
|
||||
MdmStorageInterceptor.setLinksDeletedBeforehand();
|
||||
|
||||
runMmdClear(theStepExecutionDetails);
|
||||
return new RunOutcome(theStepExecutionDetails.getData().size());
|
||||
|
||||
} finally {
|
||||
MdmStorageInterceptor.resetLinksDeletedBeforehand();
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void runMmdClear(
|
||||
StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails) {
|
||||
|
||||
List<? extends IResourcePersistentId> persistentIds =
|
||||
theStepExecutionDetails.getData().getResourcePersistentIds(myIdHelperService);
|
||||
|
||||
if (persistentIds.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
SystemRequestDetails requestDetails = new SystemRequestDetails();
|
||||
requestDetails.setRetry(true);
|
||||
requestDetails.setMaxRetries(100);
|
||||
requestDetails.setRequestPartitionId(
|
||||
theStepExecutionDetails.getParameters().getRequestPartitionId());
|
||||
TransactionDetails transactionDetails = new TransactionDetails();
|
||||
myHapiTransactionService.execute(
|
||||
requestDetails,
|
||||
transactionDetails,
|
||||
buildJob(requestDetails, transactionDetails, theStepExecutionDetails));
|
||||
|
||||
return new RunOutcome(theStepExecutionDetails.getData().size());
|
||||
}
|
||||
String instanceId = theStepExecutionDetails.getInstance().getInstanceId();
|
||||
|
||||
MdmClearJob buildJob(
|
||||
RequestDetails requestDetails,
|
||||
TransactionDetails transactionDetails,
|
||||
StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails) {
|
||||
return new MdmClearJob(requestDetails, transactionDetails, theStepExecutionDetails);
|
||||
}
|
||||
String chunkId = theStepExecutionDetails.getChunkId();
|
||||
|
||||
class MdmClearJob implements TransactionCallback<Void> {
|
||||
private final RequestDetails myRequestDetails;
|
||||
private final TransactionDetails myTransactionDetails;
|
||||
private final ResourceIdListWorkChunkJson myData;
|
||||
private final String myChunkId;
|
||||
private final String myInstanceId;
|
||||
ourLog.info(
|
||||
"Starting mdm clear work chunk with {} resources - Instance[{}] Chunk[{}]",
|
||||
persistentIds.size(),
|
||||
instanceId,
|
||||
chunkId);
|
||||
|
||||
public MdmClearJob(
|
||||
RequestDetails theRequestDetails,
|
||||
TransactionDetails theTransactionDetails,
|
||||
StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails) {
|
||||
myRequestDetails = theRequestDetails;
|
||||
myTransactionDetails = theTransactionDetails;
|
||||
myData = theStepExecutionDetails.getData();
|
||||
myInstanceId = theStepExecutionDetails.getInstance().getInstanceId();
|
||||
myChunkId = theStepExecutionDetails.getChunkId();
|
||||
}
|
||||
StopWatch sw = new StopWatch();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Void doInTransaction(@Nonnull TransactionStatus theStatus) {
|
||||
List<? extends IResourcePersistentId> persistentIds = myData.getResourcePersistentIds(myIdHelperService);
|
||||
if (persistentIds.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
myHapiTransactionService.withRequest(requestDetails).execute(() -> {
|
||||
myMdmLinkSvc.deleteLinksWithAnyReferenceToPids(persistentIds);
|
||||
ourLog.trace("Deleted {} mdm links in {}", persistentIds.size(), StopWatch.formatMillis(sw.getMillis()));
|
||||
});
|
||||
|
||||
// avoid double deletion of mdm links
|
||||
MdmStorageInterceptor.setLinksDeletedBeforehand();
|
||||
// use the expunge service to delete multiple resources at once efficiently
|
||||
IDeleteExpungeSvc deleteExpungeSvc = myIMdmClearHelperSvc.getDeleteExpungeSvc();
|
||||
int deletedRecords = deleteExpungeSvc.deleteExpungeBatch(persistentIds, false, null, requestDetails);
|
||||
ourLog.trace(
|
||||
"Deleted {} of {} golden resources in {}",
|
||||
deletedRecords,
|
||||
persistentIds.size(),
|
||||
StopWatch.formatMillis(sw.getMillis()));
|
||||
|
||||
try {
|
||||
performWork(persistentIds);
|
||||
ourLog.info(
|
||||
"Finished removing {} of {} golden resources in {} - {}/sec - Instance[{}] Chunk[{}]",
|
||||
deletedRecords,
|
||||
persistentIds.size(),
|
||||
sw,
|
||||
sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS),
|
||||
instanceId,
|
||||
chunkId);
|
||||
|
||||
} finally {
|
||||
MdmStorageInterceptor.resetLinksDeletedBeforehand();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
private void performWork(List<? extends IResourcePersistentId> thePersistentIds) {
|
||||
ourLog.info(
|
||||
"Starting mdm clear work chunk with {} resources - Instance[{}] Chunk[{}]",
|
||||
thePersistentIds.size(),
|
||||
myInstanceId,
|
||||
myChunkId);
|
||||
StopWatch sw = new StopWatch();
|
||||
|
||||
myMdmLinkSvc.deleteLinksWithAnyReferenceToPids(thePersistentIds);
|
||||
ourLog.trace("Deleted {} mdm links in {}", thePersistentIds.size(), StopWatch.formatMillis(sw.getMillis()));
|
||||
|
||||
// use the expunge service to delete multiple resources at once efficiently
|
||||
IDeleteExpungeSvc deleteExpungeSvc = myIMdmClearHelperSvc.getDeleteExpungeSvc();
|
||||
int deletedRecords = deleteExpungeSvc.deleteExpunge(thePersistentIds, false, null);
|
||||
|
||||
ourLog.trace(
|
||||
"Deleted {} of {} golden resources in {}",
|
||||
deletedRecords,
|
||||
thePersistentIds.size(),
|
||||
StopWatch.formatMillis(sw.getMillis()));
|
||||
|
||||
ourLog.info(
|
||||
"Finished removing {} of {} golden resources in {} - {}/sec - Instance[{}] Chunk[{}]",
|
||||
deletedRecords,
|
||||
thePersistentIds.size(),
|
||||
sw,
|
||||
sw.formatThroughput(thePersistentIds.size(), TimeUnit.SECONDS),
|
||||
myInstanceId,
|
||||
myChunkId);
|
||||
|
||||
if (ourClearCompletionCallbackForUnitTest != null) {
|
||||
ourClearCompletionCallbackForUnitTest.run();
|
||||
}
|
||||
if (ourClearCompletionCallbackForUnitTest != null) {
|
||||
ourClearCompletionCallbackForUnitTest.run();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,13 +19,20 @@
|
|||
*/
|
||||
package ca.uhn.fhir.jpa.api.svc;
|
||||
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface IDeleteExpungeSvc<T extends IResourcePersistentId<?>> {
|
||||
|
||||
int deleteExpunge(List<T> thePersistentIds, boolean theCascade, Integer theCascadeMaxRounds);
|
||||
int deleteExpungeSingleResource(T thePersistentId, boolean theCascade, Integer theCascadeMaxRounds);
|
||||
|
||||
int deleteExpungeBatch(
|
||||
List<T> thePersistentIds,
|
||||
boolean theCascade,
|
||||
Integer theCascadeMaxRounds,
|
||||
RequestDetails theRequestDetails);
|
||||
|
||||
boolean isCascadeSupported();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue