pass the whole work chunk instead of just the ID
This commit is contained in:
parent
6772b2160f
commit
72a837cb10
|
@ -19,10 +19,12 @@
|
|||
*/
|
||||
package ca.uhn.fhir.jpa.delete.batch2;
|
||||
|
||||
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||
import ca.uhn.fhir.jpa.api.svc.IDeleteExpungeSvc;
|
||||
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
|
||||
import ca.uhn.fhir.jpa.model.dao.JpaPid;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||
import ca.uhn.fhir.model.api.IModelJson;
|
||||
import jakarta.persistence.EntityManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -48,13 +50,16 @@ public class DeleteExpungeSvcImpl implements IDeleteExpungeSvc<JpaPid> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int deleteExpunge(
|
||||
List<JpaPid> theJpaPids, boolean theCascade, Integer theCascadeMaxRounds, String theChunkId) {
|
||||
public <PT extends IModelJson> int deleteExpunge(
|
||||
List<JpaPid> theJpaPids, boolean theCascade, Integer theCascadeMaxRounds, PT theWorkChunk) {
|
||||
DeleteExpungeSqlBuilder.DeleteExpungeSqlResult sqlResult =
|
||||
myDeleteExpungeSqlBuilder.convertPidsToDeleteExpungeSql(theJpaPids, theCascade, theCascadeMaxRounds);
|
||||
List<String> sqlList = sqlResult.getSqlStatements();
|
||||
|
||||
String formattedChunkIdForLogMessage = theChunkId.isBlank() ? "" : "Chunk[" + theChunkId + "] - ";
|
||||
String formattedChunkIdForLogMessage = "";
|
||||
if (theWorkChunk instanceof WorkChunk && !((WorkChunk) theWorkChunk).getId().isBlank()) {
|
||||
formattedChunkIdForLogMessage = "Chunk[" + ((WorkChunk) theWorkChunk).getId() + "] - ";
|
||||
}
|
||||
|
||||
ourLog.debug("{}Executing {} delete expunge sql commands", formattedChunkIdForLogMessage, sqlList.size());
|
||||
long totalDeleted = 0;
|
||||
|
@ -63,10 +68,7 @@ public class DeleteExpungeSvcImpl implements IDeleteExpungeSvc<JpaPid> {
|
|||
totalDeleted += myEntityManager.createNativeQuery(sql).executeUpdate();
|
||||
}
|
||||
|
||||
ourLog.info(
|
||||
"{}Delete expunge sql commands affected {} rows",
|
||||
formattedChunkIdForLogMessage,
|
||||
totalDeleted);
|
||||
ourLog.info("{}Delete expunge sql commands affected {} rows", formattedChunkIdForLogMessage, totalDeleted);
|
||||
clearHibernateSearchIndex(theJpaPids);
|
||||
|
||||
// TODO KHS instead of logging progress, produce result chunks that get aggregated into a delete expunge report
|
||||
|
|
|
@ -1408,7 +1408,7 @@ public class FhirResourceDaoR4CreateTest extends BaseJpaR4Test {
|
|||
final List<JpaPid> pidOrThrowException1 = List.of(pidOrThrowException);
|
||||
|
||||
final TransactionTemplate transactionTemplate = new TransactionTemplate(getTxManager());
|
||||
transactionTemplate.execute(x -> myDeleteExpungeSvc.deleteExpunge(pidOrThrowException1, true, 10, ""));
|
||||
transactionTemplate.execute(x -> myDeleteExpungeSvc.deleteExpunge(pidOrThrowException1, true, 10, null));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson;
|
|||
import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeStep;
|
||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep;
|
||||
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.context.support.ValidationSupportContext;
|
||||
import ca.uhn.fhir.context.support.ValueSetExpansionOptions;
|
||||
|
@ -853,9 +854,12 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
|
|||
|
||||
IJobDataSink<VoidModel> sink = mock(IJobDataSink.class);
|
||||
|
||||
WorkChunk workChunk = mock(WorkChunk.class);
|
||||
when(workChunk.getId()).thenReturn("chunk-id");
|
||||
|
||||
// Test
|
||||
myCaptureQueriesListener.clear();
|
||||
RunOutcome outcome = myDeleteExpungeStep.doDeleteExpunge(new ResourceIdListWorkChunkJson(pids, null), sink, "instance-id", "chunk-id", false, null);
|
||||
RunOutcome outcome = myDeleteExpungeStep.doDeleteExpunge(new ResourceIdListWorkChunkJson(pids, null), sink, "instance-id", workChunk, false, null);
|
||||
|
||||
// Verify
|
||||
assertEquals(1, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package ca.uhn.fhir.jpa.delete;
|
||||
|
||||
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
|
||||
import ca.uhn.fhir.jpa.delete.batch2.DeleteExpungeSqlBuilder;
|
||||
import ca.uhn.fhir.jpa.delete.batch2.DeleteExpungeSvcImpl;
|
||||
|
@ -48,8 +49,10 @@ public class DeleteExpungeSvcImplTest {
|
|||
logger.addAppender(myAppender);
|
||||
|
||||
when(myDeleteExpungeSqlBuilder.convertPidsToDeleteExpungeSql(Collections.emptyList(), false, 1)).thenReturn(mock(DeleteExpungeSqlBuilder.DeleteExpungeSqlResult.class));
|
||||
WorkChunk workChunk = mock(WorkChunk.class);
|
||||
when(workChunk.getId()).thenReturn("abc-123");
|
||||
|
||||
myDeleteExpungeSvc.deleteExpunge(Collections.emptyList(), false, 1, "abc-123");
|
||||
myDeleteExpungeSvc.deleteExpunge(Collections.emptyList(), false, 1, workChunk);
|
||||
verify(myAppender, atLeastOnce()).doAppend(myLoggingEvent.capture());
|
||||
List<ILoggingEvent> events = myLoggingEvent.getAllValues();
|
||||
assertEquals(Level.INFO, events.get(0).getLevel());
|
||||
|
|
|
@ -352,7 +352,7 @@ public class MdmStorageInterceptor implements IMdmStorageInterceptor {
|
|||
@SuppressWarnings("unchecked")
|
||||
private int deleteExpungeGoldenResource(IResourcePersistentId theGoldenPid) {
|
||||
IDeleteExpungeSvc deleteExpungeSvc = myIMdmClearHelperSvc.getDeleteExpungeSvc();
|
||||
return deleteExpungeSvc.deleteExpunge(new ArrayList<>(Collections.singleton(theGoldenPid)), false, null, "");
|
||||
return deleteExpungeSvc.deleteExpunge(new ArrayList<>(Collections.singleton(theGoldenPid)), false, null, null);
|
||||
}
|
||||
|
||||
private void forbidIfModifyingExternalEidOnTarget(IBaseResource theNewResource, IBaseResource theOldResource) {
|
||||
|
|
|
@ -26,6 +26,7 @@ import ca.uhn.fhir.batch2.api.RunOutcome;
|
|||
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||
import ca.uhn.fhir.batch2.api.VoidModel;
|
||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||
import ca.uhn.fhir.jpa.api.svc.IDeleteExpungeSvc;
|
||||
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||
|
@ -75,7 +76,7 @@ public class DeleteExpungeStep
|
|||
data,
|
||||
theDataSink,
|
||||
theStepExecutionDetails.getInstance().getInstanceId(),
|
||||
theStepExecutionDetails.getChunkId(),
|
||||
theStepExecutionDetails.getWorkChunk(),
|
||||
cascade,
|
||||
cascadeMaxRounds);
|
||||
}
|
||||
|
@ -85,7 +86,7 @@ public class DeleteExpungeStep
|
|||
ResourceIdListWorkChunkJson theData,
|
||||
IJobDataSink<VoidModel> theDataSink,
|
||||
String theInstanceId,
|
||||
String theChunkId,
|
||||
WorkChunk theWorkChunk,
|
||||
boolean theCascade,
|
||||
Integer theCascadeMaxRounds) {
|
||||
RequestDetails requestDetails = new SystemRequestDetails();
|
||||
|
@ -96,7 +97,7 @@ public class DeleteExpungeStep
|
|||
transactionDetails,
|
||||
theDataSink,
|
||||
theInstanceId,
|
||||
theChunkId,
|
||||
theWorkChunk,
|
||||
theCascade,
|
||||
theCascadeMaxRounds);
|
||||
myHapiTransactionService
|
||||
|
@ -113,7 +114,7 @@ public class DeleteExpungeStep
|
|||
private final RequestDetails myRequestDetails;
|
||||
private final TransactionDetails myTransactionDetails;
|
||||
private final IJobDataSink<VoidModel> myDataSink;
|
||||
private final String myChunkId;
|
||||
private final WorkChunk myWorkChunk;
|
||||
private final String myInstanceId;
|
||||
private final boolean myCascade;
|
||||
private final Integer myCascadeMaxRounds;
|
||||
|
@ -125,7 +126,7 @@ public class DeleteExpungeStep
|
|||
TransactionDetails theTransactionDetails,
|
||||
IJobDataSink<VoidModel> theDataSink,
|
||||
String theInstanceId,
|
||||
String theChunkId,
|
||||
WorkChunk theWorkChunk,
|
||||
boolean theCascade,
|
||||
Integer theCascadeMaxRounds) {
|
||||
myData = theData;
|
||||
|
@ -133,7 +134,7 @@ public class DeleteExpungeStep
|
|||
myTransactionDetails = theTransactionDetails;
|
||||
myDataSink = theDataSink;
|
||||
myInstanceId = theInstanceId;
|
||||
myChunkId = theChunkId;
|
||||
myWorkChunk = theWorkChunk;
|
||||
myCascade = theCascade;
|
||||
myCascadeMaxRounds = theCascadeMaxRounds;
|
||||
}
|
||||
|
@ -146,12 +147,13 @@ public class DeleteExpungeStep
|
|||
public Void doInTransaction(@Nonnull TransactionStatus theStatus) {
|
||||
|
||||
List<JpaPid> persistentIds = myData.getResourcePersistentIds(myIdHelperService);
|
||||
String workChunkId = myWorkChunk.getId();
|
||||
|
||||
if (persistentIds.isEmpty()) {
|
||||
ourLog.info(
|
||||
"Starting delete expunge work chunk. There are no resources to delete expunge - Instance[{}] Chunk[{}]",
|
||||
myInstanceId,
|
||||
myChunkId);
|
||||
workChunkId);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -159,15 +161,15 @@ public class DeleteExpungeStep
|
|||
"Starting delete expunge work chunk with {} resources - Instance[{}] Chunk[{}]",
|
||||
persistentIds.size(),
|
||||
myInstanceId,
|
||||
myChunkId);
|
||||
workChunkId);
|
||||
|
||||
myRecordCount = myDeleteExpungeSvc.deleteExpunge(persistentIds, myCascade, myCascadeMaxRounds, myChunkId);
|
||||
myRecordCount = myDeleteExpungeSvc.deleteExpunge(persistentIds, myCascade, myCascadeMaxRounds, myWorkChunk);
|
||||
|
||||
ourLog.info(
|
||||
"Delete expunge finished deleting {} resources - Instance[{}] Chunk[{}]",
|
||||
myRecordCount,
|
||||
myInstanceId,
|
||||
myChunkId);
|
||||
workChunkId);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import ca.uhn.fhir.batch2.api.RunOutcome;
|
|||
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||
import ca.uhn.fhir.batch2.api.VoidModel;
|
||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||
import ca.uhn.fhir.jpa.api.svc.IDeleteExpungeSvc;
|
||||
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||
import ca.uhn.fhir.jpa.api.svc.IMdmClearHelperSvc;
|
||||
|
@ -96,7 +97,7 @@ public class MdmClearStep implements IJobStepWorker<MdmClearJobParameters, Resou
|
|||
private final RequestDetails myRequestDetails;
|
||||
private final TransactionDetails myTransactionDetails;
|
||||
private final ResourceIdListWorkChunkJson myData;
|
||||
private final String myChunkId;
|
||||
private final WorkChunk myWorkChunk;
|
||||
private final String myInstanceId;
|
||||
|
||||
public MdmClearJob(
|
||||
|
@ -107,7 +108,7 @@ public class MdmClearStep implements IJobStepWorker<MdmClearJobParameters, Resou
|
|||
myTransactionDetails = theTransactionDetails;
|
||||
myData = theStepExecutionDetails.getData();
|
||||
myInstanceId = theStepExecutionDetails.getInstance().getInstanceId();
|
||||
myChunkId = theStepExecutionDetails.getChunkId();
|
||||
myWorkChunk = theStepExecutionDetails.getWorkChunk();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -137,7 +138,7 @@ public class MdmClearStep implements IJobStepWorker<MdmClearJobParameters, Resou
|
|||
"Starting mdm clear work chunk with {} resources - Instance[{}] Chunk[{}]",
|
||||
thePersistentIds.size(),
|
||||
myInstanceId,
|
||||
myChunkId);
|
||||
myWorkChunk);
|
||||
StopWatch sw = new StopWatch();
|
||||
|
||||
myMdmLinkSvc.deleteLinksWithAnyReferenceToPids(thePersistentIds);
|
||||
|
@ -145,7 +146,7 @@ public class MdmClearStep implements IJobStepWorker<MdmClearJobParameters, Resou
|
|||
|
||||
// use the expunge service to delete multiple resources at once efficiently
|
||||
IDeleteExpungeSvc deleteExpungeSvc = myIMdmClearHelperSvc.getDeleteExpungeSvc();
|
||||
int deletedRecords = deleteExpungeSvc.deleteExpunge(thePersistentIds, false, null, myChunkId);
|
||||
int deletedRecords = deleteExpungeSvc.deleteExpunge(thePersistentIds, false, null, myWorkChunk);
|
||||
|
||||
ourLog.trace(
|
||||
"Deleted {} of {} golden resources in {}",
|
||||
|
@ -160,7 +161,7 @@ public class MdmClearStep implements IJobStepWorker<MdmClearJobParameters, Resou
|
|||
sw,
|
||||
sw.formatThroughput(thePersistentIds.size(), TimeUnit.SECONDS),
|
||||
myInstanceId,
|
||||
myChunkId);
|
||||
myWorkChunk);
|
||||
|
||||
if (ourClearCompletionCallbackForUnitTest != null) {
|
||||
ourClearCompletionCallbackForUnitTest.run();
|
||||
|
|
|
@ -19,13 +19,15 @@
|
|||
*/
|
||||
package ca.uhn.fhir.jpa.api.svc;
|
||||
|
||||
import ca.uhn.fhir.model.api.IModelJson;
|
||||
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface IDeleteExpungeSvc<T extends IResourcePersistentId<?>> {
|
||||
|
||||
int deleteExpunge(List<T> thePersistentIds, boolean theCascade, Integer theCascadeMaxRounds, String theChunkid);
|
||||
<PT extends IModelJson> int deleteExpunge(
|
||||
List<T> thePersistentIds, boolean theCascade, Integer theCascadeMaxRounds, PT theWorkChunk);
|
||||
|
||||
boolean isCascadeSupported();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue