This commit is contained in:
Emre Dincturk 2023-12-07 17:16:00 -05:00
parent 303563079a
commit 56e88fee99
7 changed files with 99 additions and 151 deletions

View File

@ -58,7 +58,8 @@ public class Batch2SupportConfig {
DeleteExpungeSqlBuilder theDeleteExpungeSqlBuilder,
@Autowired(required = false) IFulltextSearchSvc theFullTextSearchSvc,
IHapiTransactionService theTransactionService) {
return new DeleteExpungeSvcImpl(theEntityManager, theDeleteExpungeSqlBuilder, theFullTextSearchSvc, theTransactionService);
return new DeleteExpungeSvcImpl(
theEntityManager, theDeleteExpungeSqlBuilder, theFullTextSearchSvc, theTransactionService);
}
@Bean

View File

@ -72,7 +72,8 @@ public class DeleteExpungeSqlBuilder {
}
ResourceForeignKey resourceTablePk = new ResourceForeignKey("HFJ_RESOURCE", "RES_ID");
List<String> rawSqlToDeleteResources= Collections.singletonList(deleteRecordsByColumnSql(pidListString, resourceTablePk));
List<String> rawSqlToDeleteResources =
Collections.singletonList(deleteRecordsByColumnSql(pidListString, resourceTablePk));
return new DeleteExpungeSqlResult(rawSqlToDeleteReferences, rawSqlToDeleteResources, pids.size());
}
@ -183,9 +184,10 @@ public class DeleteExpungeSqlBuilder {
private final List<String> mySqlStatementsToDeleteResources;
private final int myRecordCount;
public DeleteExpungeSqlResult(List<String> theSqlStatementsToDeleteReferences,
List<String> theSqlStatementsToDeleteResources,
int theRecordCount) {
public DeleteExpungeSqlResult(
List<String> theSqlStatementsToDeleteReferences,
List<String> theSqlStatementsToDeleteResources,
int theRecordCount) {
mySqlStatementsToDeleteReferences = theSqlStatementsToDeleteReferences;
mySqlStatementsToDeleteResources = theSqlStatementsToDeleteResources;
myRecordCount = theRecordCount;
@ -195,10 +197,10 @@ public class DeleteExpungeSqlBuilder {
return mySqlStatementsToDeleteReferences;
}
public List<String> getSqlStatementsToDeleteResources() {
return mySqlStatementsToDeleteResources;
}
public int getRecordCount() {
return myRecordCount;
}

View File

@ -30,7 +30,6 @@ import ca.uhn.fhir.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.support.TransactionOperations;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.Collections;
@ -47,23 +46,22 @@ public class DeleteExpungeSvcImpl implements IDeleteExpungeSvc<JpaPid> {
private final IFulltextSearchSvc myFullTextSearchSvc;
public DeleteExpungeSvcImpl(
EntityManager theEntityManager,
DeleteExpungeSqlBuilder theDeleteExpungeSqlBuilder,
@Autowired(required = false) IFulltextSearchSvc theFullTextSearchSvc,
IHapiTransactionService theHapiTransactionService) {
EntityManager theEntityManager,
DeleteExpungeSqlBuilder theDeleteExpungeSqlBuilder,
@Autowired(required = false) IFulltextSearchSvc theFullTextSearchSvc,
IHapiTransactionService theHapiTransactionService) {
myEntityManager = theEntityManager;
myDeleteExpungeSqlBuilder = theDeleteExpungeSqlBuilder;
myFullTextSearchSvc = theFullTextSearchSvc;
myHapiTransactionService = theHapiTransactionService;
}
public int deleteExpungeSingleResource(JpaPid theJpaPid,
boolean theCascade,
Integer theCascadeMaxRounds) {
public int deleteExpungeSingleResource(JpaPid theJpaPid, boolean theCascade, Integer theCascadeMaxRounds) {
assert TransactionSynchronizationManager.isActualTransactionActive();
Long pid = theJpaPid.getId();
DeleteExpungeSqlBuilder.DeleteExpungeSqlResult sqlResult =
myDeleteExpungeSqlBuilder.convertPidsToDeleteExpungeSql(Collections.singleton(pid), theCascade, theCascadeMaxRounds);
myDeleteExpungeSqlBuilder.convertPidsToDeleteExpungeSql(
Collections.singleton(pid), theCascade, theCascadeMaxRounds);
executeSqlList(sqlResult.getSqlStatementsToDeleteReferences());
executeSqlList(sqlResult.getSqlStatementsToDeleteResources());
@ -72,36 +70,35 @@ public class DeleteExpungeSvcImpl implements IDeleteExpungeSvc<JpaPid> {
}
@Override
public int deleteExpungeBatch(List<JpaPid> theJpaPids,
boolean theCascade,
Integer theCascadeMaxRounds,
RequestDetails theRequestDetails) {
public int deleteExpungeBatch(
List<JpaPid> theJpaPids,
boolean theCascade,
Integer theCascadeMaxRounds,
RequestDetails theRequestDetails) {
//assert there is no active transaction
// assert there is no active transaction
assert !TransactionSynchronizationManager.isActualTransactionActive();
Set<Long> pids = JpaPid.toLongSet(theJpaPids);
DeleteExpungeSqlBuilder.DeleteExpungeSqlResult sqlResult =
myDeleteExpungeSqlBuilder.convertPidsToDeleteExpungeSql(pids, theCascade, theCascadeMaxRounds);
myDeleteExpungeSqlBuilder.convertPidsToDeleteExpungeSql(pids, theCascade, theCascadeMaxRounds);
IHapiTransactionService.IExecutionBuilder executionBuilder = myHapiTransactionService
.withRequest(theRequestDetails)
.withTransactionDetails(new TransactionDetails());
.withRequest(theRequestDetails)
.withTransactionDetails(new TransactionDetails());
executionBuilder.execute(() -> {
executeSqlList(sqlResult.getSqlStatementsToDeleteReferences())
executeSqlList(sqlResult.getSqlStatementsToDeleteReferences());
});
executionBuilder.execute(() -> {
executeSqlList(sqlResult.getSqlStatementsToDeleteResources());
});
return sqlResult.getRecordCount();
}
private int executeSqlList(List<String> sqlList) {
long totalDeleted = 0;
int totalDeleted = 0;
StopWatch sw = new StopWatch();
for (String sql : sqlList) {
@ -114,44 +111,7 @@ public class DeleteExpungeSvcImpl implements IDeleteExpungeSvc<JpaPid> {
}
ourLog.info("{} records deleted", totalDeleted);
}
private int deleteExpungeReferencesToResources(
List<JpaPid> theJpaPids) {
ourLog.info("Executing {} delete expunge sql commands to delete references ", sqlList.size());
// TODO KHS instead of logging progress, produce result chunks that get aggregated into a delete expunge report
}
@Override
public int deleteExpungeResources(List<JpaPid> theJpaPids, boolean theCascade, Integer theCascadeMaxRounds) {
Set<Long> pids = JpaPid.toLongSet(theJpaPids);
DeleteExpungeSqlBuilder.DeleteExpungeSqlResult sqlResult =
myDeleteExpungeSqlBuilder.convertPidsToDeleteExpungeResourcesSql(pids);
List<String> sqlList = sqlResult.getSqlStatementsToDeleteReferences();
ourLog.info("Executing {} delete expunge sql commands", sqlList.size());
long totalDeleted = 0;
StopWatch sw = new StopWatch();
for (String sql : sqlList) {
sw.restart();
ourLog.info("Executing sql " + sql);
int deleted = myEntityManager.createNativeQuery(sql).executeUpdate();
ourLog.info("sql took " + StopWatch.formatMillis(sw.getMillis()));
ourLog.info("#deleted records:" + deleted);
totalDeleted += deleted;
}
ourLog.info("{} records deleted", totalDeleted);
clearHibernateSearchIndex(theJpaPids);
// TODO KHS instead of logging progress, produce result chunks that get aggregated into a delete expunge report
return sqlResult.getRecordCount();
return totalDeleted;
}
@Override
@ -166,7 +126,7 @@ public class DeleteExpungeSvcImpl implements IDeleteExpungeSvc<JpaPid> {
private void clearHibernateSearchIndex(List<JpaPid> thePersistentIds) {
if (myFullTextSearchSvc != null && !myFullTextSearchSvc.isDisabled()) {
List<Object> objectIds =
thePersistentIds.stream().map(JpaPid::getId).collect(Collectors.toList());
thePersistentIds.stream().map(JpaPid::getId).collect(Collectors.toList());
myFullTextSearchSvc.deleteIndexedDocumentsByTypeAndId(ResourceTable.class, objectIds);
ourLog.info("Cleared Hibernate Search indexes.");
}

View File

@ -67,9 +67,7 @@ public class DeleteExpungeAppCtx {
ResourceIdListWorkChunkJson.class,
new LoadIdsStep(theBatch2DaoSvc))
.addLastStep(
"expunge",
"Perform the resource expunge",
expungeStep(theDeleteExpungeSvc, theIdHelperService))
"expunge", "Perform the resource expunge", expungeStep(theDeleteExpungeSvc, theIdHelperService))
.build();
}
@ -85,9 +83,7 @@ public class DeleteExpungeAppCtx {
}
@Bean
public DeleteExpungeStep expungeStep(
IDeleteExpungeSvc theDeleteExpungeSvc,
IIdHelperService theIdHelperService) {
public DeleteExpungeStep expungeStep(IDeleteExpungeSvc theDeleteExpungeSvc, IIdHelperService theIdHelperService) {
return new DeleteExpungeStep(theDeleteExpungeSvc, theIdHelperService);
}

View File

@ -28,26 +28,23 @@ import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.jpa.api.svc.IDeleteExpungeSvc;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.List;
import javax.annotation.Nonnull;
public class DeleteExpungeStep
implements IJobStepWorker<DeleteExpungeJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
implements IJobStepWorker<DeleteExpungeJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
private static final Logger ourLog = LoggerFactory.getLogger(DeleteExpungeStep.class);
private final IDeleteExpungeSvc myDeleteExpungeSvc;
private final IIdHelperService myIdHelperService;
public DeleteExpungeStep(
IDeleteExpungeSvc theDeleteExpungeSvc,
IIdHelperService theIdHelperService) {
public DeleteExpungeStep(IDeleteExpungeSvc theDeleteExpungeSvc, IIdHelperService theIdHelperService) {
myDeleteExpungeSvc = theDeleteExpungeSvc;
myIdHelperService = theIdHelperService;
}
@ -55,37 +52,36 @@ public class DeleteExpungeStep
@Nonnull
@Override
public RunOutcome run(
@Nonnull
StepExecutionDetails<DeleteExpungeJobParameters, ResourceIdListWorkChunkJson>
theStepExecutionDetails,
@Nonnull IJobDataSink<VoidModel> theDataSink)
throws JobExecutionFailedException {
@Nonnull
StepExecutionDetails<DeleteExpungeJobParameters, ResourceIdListWorkChunkJson>
theStepExecutionDetails,
@Nonnull IJobDataSink<VoidModel> theDataSink)
throws JobExecutionFailedException {
ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData();
boolean cascade = theStepExecutionDetails.getParameters().isCascade();
Integer cascadeMaxRounds = theStepExecutionDetails.getParameters().getCascadeMaxRounds();
return doDeleteExpunge(
data,
theStepExecutionDetails.getInstance().getInstanceId(),
theStepExecutionDetails.getChunkId(),
cascade,
cascadeMaxRounds);
data,
theStepExecutionDetails.getInstance().getInstanceId(),
theStepExecutionDetails.getChunkId(),
cascade,
cascadeMaxRounds);
}
@Nonnull
public RunOutcome doDeleteExpunge(
ResourceIdListWorkChunkJson theData,
String theInstanceId,
String theChunkId,
boolean theCascade,
Integer theCascadeMaxRounds) {
ResourceIdListWorkChunkJson theData,
String theInstanceId,
String theChunkId,
boolean theCascade,
Integer theCascadeMaxRounds) {
SystemRequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRequestPartitionId(theData.getRequestPartitionId());
DeleteExpungeJob job = new DeleteExpungeJob(
theData, theInstanceId, theChunkId, theCascade, theCascadeMaxRounds, requestDetails);
theData, theInstanceId, theChunkId, theCascade, theCascadeMaxRounds, requestDetails);
job.executeJob();
@ -102,12 +98,12 @@ public class DeleteExpungeStep
private final RequestDetails myRequestDetails;
public DeleteExpungeJob(
ResourceIdListWorkChunkJson theData,
String theInstanceId,
String theChunkId,
boolean theCascade,
Integer theCascadeMaxRounds,
RequestDetails theRequestDetails) {
ResourceIdListWorkChunkJson theData,
String theInstanceId,
String theChunkId,
boolean theCascade,
Integer theCascadeMaxRounds,
RequestDetails theRequestDetails) {
myData = theData;
myInstanceId = theInstanceId;
myChunkId = theChunkId;
@ -126,20 +122,21 @@ public class DeleteExpungeStep
if (persistentIds.isEmpty()) {
ourLog.info(
"Starting delete expunge work chunk. There are no resources to delete expunge - Instance[{}] Chunk[{}]",
myInstanceId,
myChunkId);
"Starting delete expunge work chunk. There are no resources to delete expunge - Instance[{}] Chunk[{}]",
myInstanceId,
myChunkId);
}
ourLog.info(
"Starting delete expunge work chunk with {} resources - Instance[{}] Chunk[{}]",
persistentIds.size(),
myInstanceId,
myChunkId);
"Starting delete expunge work chunk with {} resources - Instance[{}] Chunk[{}]",
persistentIds.size(),
myInstanceId,
myChunkId);
//not creating a transaction here, as deleteExpungeBatch manages
//its transactions internally to prevent deadlocks
myRecordCount = myDeleteExpungeSvc.deleteExpungeBatch(persistentIds, myCascade, myCascadeMaxRounds, myRequestDetails);
// not creating a transaction here, as deleteExpungeBatch manages
// its transactions internally to prevent deadlocks
myRecordCount = myDeleteExpungeSvc.deleteExpungeBatch(
persistentIds, myCascade, myCascadeMaxRounds, myRequestDetails);
}
}
}

View File

@ -40,9 +40,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
@SuppressWarnings("rawtypes")
public class MdmClearStep implements IJobStepWorker<MdmClearJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
@ -65,9 +65,9 @@ public class MdmClearStep implements IJobStepWorker<MdmClearJobParameters, Resou
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails,
@Nonnull IJobDataSink<VoidModel> theDataSink)
throws JobExecutionFailedException {
@Nonnull StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails,
@Nonnull IJobDataSink<VoidModel> theDataSink)
throws JobExecutionFailedException {
try {
// avoid double deletion of mdm links
@ -82,10 +82,11 @@ public class MdmClearStep implements IJobStepWorker<MdmClearJobParameters, Resou
}
@SuppressWarnings("unchecked")
private void runMmdClear(StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails) {
private void runMmdClear(
StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails) {
List<? extends IResourcePersistentId> persistentIds =
theStepExecutionDetails.getData().getResourcePersistentIds(myIdHelperService);
theStepExecutionDetails.getData().getResourcePersistentIds(myIdHelperService);
if (persistentIds.isEmpty()) {
return;
@ -95,58 +96,48 @@ public class MdmClearStep implements IJobStepWorker<MdmClearJobParameters, Resou
requestDetails.setRetry(true);
requestDetails.setMaxRetries(100);
requestDetails.setRequestPartitionId(
theStepExecutionDetails.getParameters().getRequestPartitionId());
theStepExecutionDetails.getParameters().getRequestPartitionId());
String instanceId = theStepExecutionDetails.getInstance().getInstanceId();
String chunkId = theStepExecutionDetails.getChunkId();
ourLog.info(
"Starting mdm clear work chunk with {} resources - Instance[{}] Chunk[{}]",
persistentIds.size(),
instanceId,
chunkId);
"Starting mdm clear work chunk with {} resources - Instance[{}] Chunk[{}]",
persistentIds.size(),
instanceId,
chunkId);
StopWatch sw = new StopWatch();
myHapiTransactionService
.withRequest(requestDetails)
.execute(() ->
{
myMdmLinkSvc.deleteLinksWithAnyReferenceToPids(persistentIds);
ourLog.trace("Deleted {} mdm links in {}", persistentIds.size(),
StopWatch.formatMillis(sw.getMillis()));
}
);
myHapiTransactionService.withRequest(requestDetails).execute(() -> {
myMdmLinkSvc.deleteLinksWithAnyReferenceToPids(persistentIds);
ourLog.trace("Deleted {} mdm links in {}", persistentIds.size(), StopWatch.formatMillis(sw.getMillis()));
});
// use the expunge service to delete multiple resources at once efficiently
IDeleteExpungeSvc deleteExpungeSvc = myIMdmClearHelperSvc.getDeleteExpungeSvc();
int deletedRecords = deleteExpungeSvc.deleteExpungeBatch(persistentIds, false, null, requestDetails);
ourLog.trace(
"Deleted {} of {} golden resources in {}",
deletedRecords,
persistentIds.size(),
StopWatch.formatMillis(sw.getMillis()));
"Deleted {} of {} golden resources in {}",
deletedRecords,
persistentIds.size(),
StopWatch.formatMillis(sw.getMillis()));
ourLog.info(
"Finished removing {} of {} golden resources in {} - {}/sec - Instance[{}] Chunk[{}]",
deletedRecords,
persistentIds.size(),
sw,
sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS),
instanceId,
chunkId);
"Finished removing {} of {} golden resources in {} - {}/sec - Instance[{}] Chunk[{}]",
deletedRecords,
persistentIds.size(),
sw,
sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS),
instanceId,
chunkId);
if (ourClearCompletionCallbackForUnitTest != null) {
ourClearCompletionCallbackForUnitTest.run();
}
}
@VisibleForTesting
public static void setClearCompletionCallbackForUnitTest(Runnable theClearCompletionCallbackForUnitTest) {
ourClearCompletionCallbackForUnitTest = theClearCompletionCallbackForUnitTest;

View File

@ -28,10 +28,11 @@ public interface IDeleteExpungeSvc<T extends IResourcePersistentId<?>> {
int deleteExpungeSingleResource(T thePersistentId, boolean theCascade, Integer theCascadeMaxRounds);
int deleteExpungeBatch(List<T> thePersistentIds, boolean theCascade, Integer theCascadeMaxRounds,
RequestDetails theRequestDetails);
int deleteExpungeBatch(
List<T> thePersistentIds,
boolean theCascade,
Integer theCascadeMaxRounds,
RequestDetails theRequestDetails);
boolean isCascadeSupported();
}