move expunge core classes and interfaces to storage project (#3529)
* move expunge core classes and interfaces to storage project * post-merge clean-up * changelog
This commit is contained in:
parent
8d116b932e
commit
f361aa5998
|
@ -0,0 +1,4 @@
|
||||||
|
type: change
|
||||||
|
issue: 3529
|
||||||
|
title: "Several classes and interfaces related to the `$expunge` operation have moved from the hapi-fhir-jpaserver-base
|
||||||
|
project to the hapi-fhir-storage project."
|
|
@ -58,7 +58,7 @@ public class MdmLinkDeleter implements ItemProcessor<List<Long>, List<Long>> {
|
||||||
public List<Long> process(List<Long> thePidList) throws Exception {
|
public List<Long> process(List<Long> thePidList) throws Exception {
|
||||||
ConcurrentLinkedQueue<Long> goldenPidAggregator = new ConcurrentLinkedQueue<>();
|
ConcurrentLinkedQueue<Long> goldenPidAggregator = new ConcurrentLinkedQueue<>();
|
||||||
PartitionRunner partitionRunner = new PartitionRunner(PROCESS_NAME, THREAD_PREFIX, myDaoConfig.getReindexBatchSize(), myDaoConfig.getReindexThreadCount());
|
PartitionRunner partitionRunner = new PartitionRunner(PROCESS_NAME, THREAD_PREFIX, myDaoConfig.getReindexBatchSize(), myDaoConfig.getReindexThreadCount());
|
||||||
partitionRunner.runInPartitionedThreads(new SliceImpl<>(thePidList), pids -> removeLinks(pids, goldenPidAggregator));
|
partitionRunner.runInPartitionedThreads(thePidList, pids -> removeLinks(pids, goldenPidAggregator));
|
||||||
return new ArrayList<>(goldenPidAggregator);
|
return new ArrayList<>(goldenPidAggregator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,8 +16,8 @@ import ca.uhn.fhir.jpa.batch.config.BatchConstants;
|
||||||
import ca.uhn.fhir.jpa.batch.job.PartitionedUrlValidator;
|
import ca.uhn.fhir.jpa.batch.job.PartitionedUrlValidator;
|
||||||
import ca.uhn.fhir.jpa.batch.mdm.MdmClearJobSubmitterImpl;
|
import ca.uhn.fhir.jpa.batch.mdm.MdmClearJobSubmitterImpl;
|
||||||
import ca.uhn.fhir.jpa.batch.reader.BatchResourceSearcher;
|
import ca.uhn.fhir.jpa.batch.reader.BatchResourceSearcher;
|
||||||
import ca.uhn.fhir.jpa.binary.provider.BinaryAccessProvider;
|
|
||||||
import ca.uhn.fhir.jpa.binary.interceptor.BinaryStorageInterceptor;
|
import ca.uhn.fhir.jpa.binary.interceptor.BinaryStorageInterceptor;
|
||||||
|
import ca.uhn.fhir.jpa.binary.provider.BinaryAccessProvider;
|
||||||
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
|
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
|
||||||
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
|
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
|
||||||
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
|
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
|
||||||
|
@ -38,6 +38,7 @@ import ca.uhn.fhir.jpa.dao.TransactionProcessor;
|
||||||
import ca.uhn.fhir.jpa.dao.expunge.ExpungeEverythingService;
|
import ca.uhn.fhir.jpa.dao.expunge.ExpungeEverythingService;
|
||||||
import ca.uhn.fhir.jpa.dao.expunge.ExpungeOperation;
|
import ca.uhn.fhir.jpa.dao.expunge.ExpungeOperation;
|
||||||
import ca.uhn.fhir.jpa.dao.expunge.ExpungeService;
|
import ca.uhn.fhir.jpa.dao.expunge.ExpungeService;
|
||||||
|
import ca.uhn.fhir.jpa.dao.expunge.IExpungeEverythingService;
|
||||||
import ca.uhn.fhir.jpa.dao.expunge.IResourceExpungeService;
|
import ca.uhn.fhir.jpa.dao.expunge.IResourceExpungeService;
|
||||||
import ca.uhn.fhir.jpa.dao.expunge.ResourceExpungeService;
|
import ca.uhn.fhir.jpa.dao.expunge.ResourceExpungeService;
|
||||||
import ca.uhn.fhir.jpa.dao.expunge.ResourceTableFKProvider;
|
import ca.uhn.fhir.jpa.dao.expunge.ResourceTableFKProvider;
|
||||||
|
@ -758,7 +759,7 @@ public class JpaConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public ExpungeEverythingService expungeEverythingService() {
|
public IExpungeEverythingService expungeEverythingService() {
|
||||||
return new ExpungeEverythingService();
|
return new ExpungeEverythingService();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -92,7 +92,7 @@ import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class ExpungeEverythingService {
|
public class ExpungeEverythingService implements IExpungeEverythingService {
|
||||||
private static final Logger ourLog = LoggerFactory.getLogger(ExpungeEverythingService.class);
|
private static final Logger ourLog = LoggerFactory.getLogger(ExpungeEverythingService.class);
|
||||||
@PersistenceContext(type = PersistenceContextType.TRANSACTION)
|
@PersistenceContext(type = PersistenceContextType.TRANSACTION)
|
||||||
protected EntityManager myEntityManager;
|
protected EntityManager myEntityManager;
|
||||||
|
@ -111,6 +111,7 @@ public class ExpungeEverythingService {
|
||||||
myTxTemplate = new TransactionTemplate(myPlatformTransactionManager);
|
myTxTemplate = new TransactionTemplate(myPlatformTransactionManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void expungeEverything(@Nullable RequestDetails theRequest) {
|
public void expungeEverything(@Nullable RequestDetails theRequest) {
|
||||||
|
|
||||||
final AtomicInteger counter = new AtomicInteger();
|
final AtomicInteger counter = new AtomicInteger();
|
||||||
|
@ -224,6 +225,7 @@ public class ExpungeEverythingService {
|
||||||
return outcome;
|
return outcome;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int expungeEverythingByType(Class<?> theEntityType) {
|
public int expungeEverythingByType(Class<?> theEntityType) {
|
||||||
int result = expungeEverythingByTypeWithoutPurging(theEntityType);
|
int result = expungeEverythingByTypeWithoutPurging(theEntityType);
|
||||||
purgeAllCaches();
|
purgeAllCaches();
|
||||||
|
|
|
@ -45,7 +45,6 @@ import ca.uhn.fhir.jpa.dao.data.IResourceProvenanceDao;
|
||||||
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
|
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
|
||||||
import ca.uhn.fhir.jpa.dao.data.IResourceTagDao;
|
import ca.uhn.fhir.jpa.dao.data.IResourceTagDao;
|
||||||
import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao;
|
import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao;
|
||||||
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
|
|
||||||
import ca.uhn.fhir.jpa.model.entity.ForcedId;
|
import ca.uhn.fhir.jpa.model.entity.ForcedId;
|
||||||
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
|
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
|
||||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||||
|
@ -126,42 +125,45 @@ public class ResourceExpungeService implements IResourceExpungeService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional
|
@Transactional
|
||||||
public Slice<Long> findHistoricalVersionsOfNonDeletedResources(String theResourceName, Long theResourceId, Long theVersion, int theRemainingCount) {
|
public List<Long> findHistoricalVersionsOfNonDeletedResources(String theResourceName, Long theResourceId, Long theVersion, int theRemainingCount) {
|
||||||
Pageable page = PageRequest.of(0, theRemainingCount);
|
Pageable page = PageRequest.of(0, theRemainingCount);
|
||||||
|
|
||||||
|
Slice<Long> ids;
|
||||||
if (theResourceId != null) {
|
if (theResourceId != null) {
|
||||||
if (theVersion != null) {
|
if (theVersion != null) {
|
||||||
return toSlice(myResourceHistoryTableDao.findForIdAndVersionAndFetchProvenance(theResourceId, theVersion));
|
ids = toSlice(myResourceHistoryTableDao.findForIdAndVersionAndFetchProvenance(theResourceId, theVersion));
|
||||||
} else {
|
} else {
|
||||||
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResourceId(page, theResourceId);
|
ids = myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResourceId(page, theResourceId);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (theResourceName != null) {
|
if (theResourceName != null) {
|
||||||
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResources(page, theResourceName);
|
ids = myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResources(page, theResourceName);
|
||||||
} else {
|
} else {
|
||||||
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResources(page);
|
ids = myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResources(page);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return ids.getContent();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional
|
@Transactional
|
||||||
public Slice<Long> findHistoricalVersionsOfDeletedResources(String theResourceName, Long theResourceId, int theRemainingCount) {
|
public List<Long> findHistoricalVersionsOfDeletedResources(String theResourceName, Long theResourceId, int theRemainingCount) {
|
||||||
Pageable page = PageRequest.of(0, theRemainingCount);
|
Pageable page = PageRequest.of(0, theRemainingCount);
|
||||||
|
Slice<Long> ids;
|
||||||
if (theResourceId != null) {
|
if (theResourceId != null) {
|
||||||
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResourcesOfType(page, theResourceId, theResourceName);
|
ids = myResourceTableDao.findIdsOfDeletedResourcesOfType(page, theResourceId, theResourceName);
|
||||||
ourLog.info("Expunging {} deleted resources of type[{}] and ID[{}]", ids.getNumberOfElements(), theResourceName, theResourceId);
|
ourLog.info("Expunging {} deleted resources of type[{}] and ID[{}]", ids.getNumberOfElements(), theResourceName, theResourceId);
|
||||||
return ids;
|
|
||||||
} else {
|
} else {
|
||||||
if (theResourceName != null) {
|
if (theResourceName != null) {
|
||||||
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResourcesOfType(page, theResourceName);
|
ids = myResourceTableDao.findIdsOfDeletedResourcesOfType(page, theResourceName);
|
||||||
ourLog.info("Expunging {} deleted resources of type[{}]", ids.getNumberOfElements(), theResourceName);
|
ourLog.info("Expunging {} deleted resources of type[{}]", ids.getNumberOfElements(), theResourceName);
|
||||||
return ids;
|
|
||||||
} else {
|
} else {
|
||||||
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResources(page);
|
ids = myResourceTableDao.findIdsOfDeletedResources(page);
|
||||||
ourLog.info("Expunging {} deleted resources (all types)", ids.getNumberOfElements());
|
ourLog.info("Expunging {} deleted resources (all types)", ids.getNumberOfElements());
|
||||||
return ids;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return ids.getContent();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -33,8 +33,6 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.batch.item.ItemProcessor;
|
import org.springframework.batch.item.ItemProcessor;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.data.domain.Slice;
|
|
||||||
import org.springframework.data.domain.SliceImpl;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -62,7 +60,7 @@ public class DeleteExpungeProcessor implements ItemProcessor<List<Long>, List<St
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<String> process(List<Long> thePids) throws Exception {
|
public List<String> process(List<Long> thePids) throws Exception {
|
||||||
validateOkToDeleteAndExpunge(new SliceImpl<>(thePids));
|
validateOkToDeleteAndExpunge(thePids);
|
||||||
|
|
||||||
List<String> retval = new ArrayList<>();
|
List<String> retval = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -79,7 +77,7 @@ public class DeleteExpungeProcessor implements ItemProcessor<List<Long>, List<St
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void validateOkToDeleteAndExpunge(Slice<Long> thePids) {
|
public void validateOkToDeleteAndExpunge(List<Long> thePids) {
|
||||||
if (!myDaoConfig.isEnforceReferentialIntegrityOnDelete()) {
|
if (!myDaoConfig.isEnforceReferentialIntegrityOnDelete()) {
|
||||||
ourLog.info("Referential integrity on delete disabled. Skipping referential integrity check.");
|
ourLog.info("Referential integrity on delete disabled. Skipping referential integrity check.");
|
||||||
return;
|
return;
|
||||||
|
@ -87,7 +85,7 @@ public class DeleteExpungeProcessor implements ItemProcessor<List<Long>, List<St
|
||||||
|
|
||||||
List<ResourceLink> conflictResourceLinks = Collections.synchronizedList(new ArrayList<>());
|
List<ResourceLink> conflictResourceLinks = Collections.synchronizedList(new ArrayList<>());
|
||||||
PartitionRunner partitionRunner = new PartitionRunner(PROCESS_NAME, THREAD_PREFIX, myDaoConfig.getExpungeBatchSize(), myDaoConfig.getExpungeThreadCount());
|
PartitionRunner partitionRunner = new PartitionRunner(PROCESS_NAME, THREAD_PREFIX, myDaoConfig.getExpungeBatchSize(), myDaoConfig.getExpungeThreadCount());
|
||||||
partitionRunner.runInPartitionedThreads(thePids, someTargetPids -> findResourceLinksWithTargetPidIn(thePids.getContent(), someTargetPids, conflictResourceLinks));
|
partitionRunner.runInPartitionedThreads(thePids, someTargetPids -> findResourceLinksWithTargetPidIn(thePids, someTargetPids, conflictResourceLinks));
|
||||||
|
|
||||||
if (conflictResourceLinks.isEmpty()) {
|
if (conflictResourceLinks.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -22,30 +22,26 @@ package ca.uhn.fhir.jpa.mdm.interceptor;
|
||||||
|
|
||||||
import ca.uhn.fhir.i18n.Msg;
|
import ca.uhn.fhir.i18n.Msg;
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
import ca.uhn.fhir.jpa.dao.expunge.IExpungeEverythingService;
|
||||||
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||||
import ca.uhn.fhir.mdm.api.MdmConstants;
|
import ca.uhn.fhir.mdm.api.MdmConstants;
|
||||||
import ca.uhn.fhir.mdm.api.IMdmSettings;
|
import ca.uhn.fhir.mdm.api.IMdmSettings;
|
||||||
import ca.uhn.fhir.mdm.model.CanonicalEID;
|
import ca.uhn.fhir.mdm.model.CanonicalEID;
|
||||||
import ca.uhn.fhir.mdm.util.EIDHelper;
|
import ca.uhn.fhir.mdm.util.EIDHelper;
|
||||||
import ca.uhn.fhir.mdm.util.MdmResourceUtil;
|
import ca.uhn.fhir.mdm.util.MdmResourceUtil;
|
||||||
import ca.uhn.fhir.mdm.util.GoldenResourceHelper;
|
|
||||||
import ca.uhn.fhir.interceptor.api.Hook;
|
import ca.uhn.fhir.interceptor.api.Hook;
|
||||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||||
import ca.uhn.fhir.jpa.dao.mdm.MdmLinkDeleteSvc;
|
import ca.uhn.fhir.jpa.dao.mdm.MdmLinkDeleteSvc;
|
||||||
import ca.uhn.fhir.jpa.dao.expunge.ExpungeEverythingService;
|
|
||||||
import ca.uhn.fhir.jpa.entity.MdmLink;
|
import ca.uhn.fhir.jpa.entity.MdmLink;
|
||||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.ForbiddenOperationException;
|
import ca.uhn.fhir.rest.server.exceptions.ForbiddenOperationException;
|
||||||
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
|
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
import org.hl7.fhir.r4.model.Extension;
|
|
||||||
import org.hl7.fhir.r4.model.Patient;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -56,7 +52,7 @@ public class MdmStorageInterceptor implements IMdmStorageInterceptor {
|
||||||
private static final Logger ourLog = LoggerFactory.getLogger(MdmStorageInterceptor.class);
|
private static final Logger ourLog = LoggerFactory.getLogger(MdmStorageInterceptor.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ExpungeEverythingService myExpungeEverythingService;
|
private IExpungeEverythingService myExpungeEverythingService;
|
||||||
@Autowired
|
@Autowired
|
||||||
private MdmLinkDeleteSvc myMdmLinkDeleteSvc;
|
private MdmLinkDeleteSvc myMdmLinkDeleteSvc;
|
||||||
@Autowired
|
@Autowired
|
||||||
|
|
|
@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.mdm.svc;
|
||||||
|
|
||||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||||
import ca.uhn.fhir.i18n.Msg;
|
import ca.uhn.fhir.i18n.Msg;
|
||||||
import ca.uhn.fhir.jpa.dao.expunge.ExpungeEverythingService;
|
import ca.uhn.fhir.jpa.dao.expunge.IExpungeEverythingService;
|
||||||
import ca.uhn.fhir.jpa.entity.MdmLink;
|
import ca.uhn.fhir.jpa.entity.MdmLink;
|
||||||
import ca.uhn.fhir.jpa.mdm.BaseMdmR4Test;
|
import ca.uhn.fhir.jpa.mdm.BaseMdmR4Test;
|
||||||
import ca.uhn.fhir.mdm.api.IMdmLinkSvc;
|
import ca.uhn.fhir.mdm.api.IMdmLinkSvc;
|
||||||
|
@ -35,7 +35,7 @@ public class MdmLinkSvcTest extends BaseMdmR4Test {
|
||||||
@Autowired
|
@Autowired
|
||||||
IMdmLinkSvc myMdmLinkSvc;
|
IMdmLinkSvc myMdmLinkSvc;
|
||||||
@Autowired
|
@Autowired
|
||||||
ExpungeEverythingService myExpungeEverythingService;
|
IExpungeEverythingService myExpungeEverythingService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@AfterEach
|
@AfterEach
|
||||||
|
|
|
@ -9,8 +9,6 @@ import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.data.domain.Slice;
|
|
||||||
import org.springframework.data.domain.SliceImpl;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -36,7 +34,7 @@ public class PartitionRunnerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void emptyList() {
|
public void emptyList() {
|
||||||
Slice<Long> resourceIds = buildSlice(0);
|
List<Long> resourceIds = buildPidList(0);
|
||||||
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
||||||
myLatch.setExpectedCount(0);
|
myLatch.setExpectedCount(0);
|
||||||
|
|
||||||
|
@ -56,17 +54,17 @@ public class PartitionRunnerTest {
|
||||||
return new PartitionRunner("TEST", "test", theBatchSize, theThreadCount);
|
return new PartitionRunner("TEST", "test", theBatchSize, theThreadCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Slice<Long> buildSlice(int size) {
|
private List<Long> buildPidList(int size) {
|
||||||
List<Long> list = new ArrayList<>();
|
List<Long> list = new ArrayList<>();
|
||||||
for (long i = 0; i < size; ++i) {
|
for (long i = 0; i < size; ++i) {
|
||||||
list.add(i + 1);
|
list.add(i + 1);
|
||||||
}
|
}
|
||||||
return new SliceImpl(list);
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void oneItem() throws InterruptedException {
|
public void oneItem() throws InterruptedException {
|
||||||
Slice<Long> resourceIds = buildSlice(1);
|
List<Long> resourceIds = buildPidList(1);
|
||||||
|
|
||||||
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
||||||
myLatch.setExpectedCount(1);
|
myLatch.setExpectedCount(1);
|
||||||
|
@ -79,7 +77,7 @@ public class PartitionRunnerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void twoItems() throws InterruptedException {
|
public void twoItems() throws InterruptedException {
|
||||||
Slice<Long> resourceIds = buildSlice(2);
|
List<Long> resourceIds = buildPidList(2);
|
||||||
|
|
||||||
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
||||||
myLatch.setExpectedCount(1);
|
myLatch.setExpectedCount(1);
|
||||||
|
@ -91,7 +89,7 @@ public class PartitionRunnerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void tenItemsBatch5() throws InterruptedException {
|
public void tenItemsBatch5() throws InterruptedException {
|
||||||
Slice<Long> resourceIds = buildSlice(10);
|
List<Long> resourceIds = buildPidList(10);
|
||||||
|
|
||||||
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
||||||
myLatch.setExpectedCount(2);
|
myLatch.setExpectedCount(2);
|
||||||
|
@ -108,7 +106,7 @@ public class PartitionRunnerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void nineItemsBatch5() throws InterruptedException {
|
public void nineItemsBatch5() throws InterruptedException {
|
||||||
Slice<Long> resourceIds = buildSlice(9);
|
List<Long> resourceIds = buildPidList(9);
|
||||||
|
|
||||||
// We don't care in which order, but one partition size should be
|
// We don't care in which order, but one partition size should be
|
||||||
// 5 and one should be 4
|
// 5 and one should be 4
|
||||||
|
@ -129,7 +127,7 @@ public class PartitionRunnerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void tenItemsOneThread() throws InterruptedException {
|
public void tenItemsOneThread() throws InterruptedException {
|
||||||
Slice<Long> resourceIds = buildSlice(10);
|
List<Long> resourceIds = buildPidList(10);
|
||||||
|
|
||||||
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
||||||
myLatch.setExpectedCount(2);
|
myLatch.setExpectedCount(2);
|
||||||
|
|
|
@ -28,9 +28,9 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Scope;
|
import org.springframework.context.annotation.Scope;
|
||||||
import org.springframework.data.domain.Slice;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@ -82,7 +82,7 @@ public class ExpungeOperation implements Callable<ExpungeOutcome> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void expungeDeletedResources() {
|
private void expungeDeletedResources() {
|
||||||
Slice<Long> resourceIds = findHistoricalVersionsOfDeletedResources();
|
List<Long> resourceIds = findHistoricalVersionsOfDeletedResources();
|
||||||
|
|
||||||
deleteHistoricalVersions(resourceIds);
|
deleteHistoricalVersions(resourceIds);
|
||||||
if (expungeLimitReached()) {
|
if (expungeLimitReached()) {
|
||||||
|
@ -92,13 +92,13 @@ public class ExpungeOperation implements Callable<ExpungeOutcome> {
|
||||||
deleteCurrentVersionsOfDeletedResources(resourceIds);
|
deleteCurrentVersionsOfDeletedResources(resourceIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Slice<Long> findHistoricalVersionsOfDeletedResources() {
|
private List<Long> findHistoricalVersionsOfDeletedResources() {
|
||||||
Slice<Long> retVal = myExpungeDaoService.findHistoricalVersionsOfDeletedResources(myResourceName, myResourceId, myRemainingCount.get());
|
List<Long> retVal = myExpungeDaoService.findHistoricalVersionsOfDeletedResources(myResourceName, myResourceId, myRemainingCount.get());
|
||||||
ourLog.debug("Found {} historical versions", retVal.getSize());
|
ourLog.debug("Found {} historical versions", retVal.size());
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Slice<Long> findHistoricalVersionsOfNonDeletedResources() {
|
private List<Long> findHistoricalVersionsOfNonDeletedResources() {
|
||||||
return myExpungeDaoService.findHistoricalVersionsOfNonDeletedResources(myResourceName, myResourceId, myVersion, myRemainingCount.get());
|
return myExpungeDaoService.findHistoricalVersionsOfNonDeletedResources(myResourceName, myResourceId, myVersion, myRemainingCount.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,7 +111,7 @@ public class ExpungeOperation implements Callable<ExpungeOutcome> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void expungeOldVersions() {
|
private void expungeOldVersions() {
|
||||||
Slice<Long> historicalIds = findHistoricalVersionsOfNonDeletedResources();
|
List<Long> historicalIds = findHistoricalVersionsOfNonDeletedResources();
|
||||||
|
|
||||||
getPartitionRunner().runInPartitionedThreads(historicalIds, partition -> myExpungeDaoService.expungeHistoricalVersions(myRequestDetails, partition, myRemainingCount));
|
getPartitionRunner().runInPartitionedThreads(historicalIds, partition -> myExpungeDaoService.expungeHistoricalVersions(myRequestDetails, partition, myRemainingCount));
|
||||||
}
|
}
|
||||||
|
@ -120,11 +120,11 @@ public class ExpungeOperation implements Callable<ExpungeOutcome> {
|
||||||
return new PartitionRunner(PROCESS_NAME, THREAD_PREFIX, myDaoConfig.getExpungeBatchSize(), myDaoConfig.getExpungeThreadCount());
|
return new PartitionRunner(PROCESS_NAME, THREAD_PREFIX, myDaoConfig.getExpungeBatchSize(), myDaoConfig.getExpungeThreadCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteCurrentVersionsOfDeletedResources(Slice<Long> theResourceIds) {
|
private void deleteCurrentVersionsOfDeletedResources(List<Long> theResourceIds) {
|
||||||
getPartitionRunner().runInPartitionedThreads(theResourceIds, partition -> myExpungeDaoService.expungeCurrentVersionOfResources(myRequestDetails, partition, myRemainingCount));
|
getPartitionRunner().runInPartitionedThreads(theResourceIds, partition -> myExpungeDaoService.expungeCurrentVersionOfResources(myRequestDetails, partition, myRemainingCount));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteHistoricalVersions(Slice<Long> theResourceIds) {
|
private void deleteHistoricalVersions(List<Long> theResourceIds) {
|
||||||
getPartitionRunner().runInPartitionedThreads(theResourceIds, partition -> myExpungeDaoService.expungeHistoricalVersionsOfIds(myRequestDetails, partition, myRemainingCount));
|
getPartitionRunner().runInPartitionedThreads(theResourceIds, partition -> myExpungeDaoService.expungeHistoricalVersionsOfIds(myRequestDetails, partition, myRemainingCount));
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,7 @@ public class ExpungeService {
|
||||||
private static final Logger ourLog = LoggerFactory.getLogger(ExpungeService.class);
|
private static final Logger ourLog = LoggerFactory.getLogger(ExpungeService.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ExpungeEverythingService myExpungeEverythingService;
|
private IExpungeEverythingService myExpungeEverythingService;
|
||||||
@Autowired
|
@Autowired
|
||||||
private IResourceExpungeService myExpungeDaoService;
|
private IResourceExpungeService myExpungeDaoService;
|
||||||
@Autowired
|
@Autowired
|
|
@ -0,0 +1,11 @@
|
||||||
|
package ca.uhn.fhir.jpa.dao.expunge;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
public interface IExpungeEverythingService {
|
||||||
|
void expungeEverything(@Nullable RequestDetails theRequest);
|
||||||
|
|
||||||
|
int expungeEverythingByType(Class<?> theEntityType);
|
||||||
|
}
|
|
@ -21,15 +21,14 @@ package ca.uhn.fhir.jpa.dao.expunge;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
import org.springframework.data.domain.Slice;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public interface IResourceExpungeService {
|
public interface IResourceExpungeService {
|
||||||
Slice<Long> findHistoricalVersionsOfDeletedResources(String theResourceName, Long theResourceId, int theI);
|
List<Long> findHistoricalVersionsOfDeletedResources(String theResourceName, Long theResourceId, int theI);
|
||||||
|
|
||||||
Slice<Long> findHistoricalVersionsOfNonDeletedResources(String theResourceName, Long theResourceId, Long theVersion, int theI);
|
List<Long> findHistoricalVersionsOfNonDeletedResources(String theResourceName, Long theResourceId, Long theVersion, int theI);
|
||||||
|
|
||||||
void expungeHistoricalVersions(RequestDetails theRequestDetails, List<Long> thePartition, AtomicInteger theRemainingCount);
|
void expungeHistoricalVersions(RequestDetails theRequestDetails, List<Long> thePartition, AtomicInteger theRemainingCount);
|
||||||
|
|
|
@ -27,7 +27,6 @@ import com.google.common.collect.Lists;
|
||||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.data.domain.Slice;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -58,7 +57,7 @@ public class PartitionRunner {
|
||||||
myThreadCount = theThreadCount;
|
myThreadCount = theThreadCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void runInPartitionedThreads(Slice<Long> theResourceIds, Consumer<List<Long>> partitionConsumer) {
|
public void runInPartitionedThreads(List<Long> theResourceIds, Consumer<List<Long>> partitionConsumer) {
|
||||||
|
|
||||||
List<Callable<Void>> callableTasks = buildCallableTasks(theResourceIds, partitionConsumer);
|
List<Callable<Void>> callableTasks = buildCallableTasks(theResourceIds, partitionConsumer);
|
||||||
if (callableTasks.size() == 0) {
|
if (callableTasks.size() == 0) {
|
||||||
|
@ -93,15 +92,15 @@ public class PartitionRunner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Callable<Void>> buildCallableTasks(Slice<Long> theResourceIds, Consumer<List<Long>> partitionConsumer) {
|
private List<Callable<Void>> buildCallableTasks(List<Long> theResourceIds, Consumer<List<Long>> partitionConsumer) {
|
||||||
List<Callable<Void>> retval = new ArrayList<>();
|
List<Callable<Void>> retval = new ArrayList<>();
|
||||||
|
|
||||||
if (myBatchSize > theResourceIds.getContent().size()) {
|
if (myBatchSize > theResourceIds.size()) {
|
||||||
ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.getContent().size(), myBatchSize);
|
ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.size(), myBatchSize);
|
||||||
} else {
|
} else {
|
||||||
ourLog.info("Creating batch job of {} entries", theResourceIds.getContent().size());
|
ourLog.info("Creating batch job of {} entries", theResourceIds.size());
|
||||||
}
|
}
|
||||||
List<List<Long>> partitions = Lists.partition(theResourceIds.getContent(), myBatchSize);
|
List<List<Long>> partitions = Lists.partition(theResourceIds, myBatchSize);
|
||||||
|
|
||||||
for (List<Long> nextPartition : partitions) {
|
for (List<Long> nextPartition : partitions) {
|
||||||
if (nextPartition.size() > 0) {
|
if (nextPartition.size() > 0) {
|
Loading…
Reference in New Issue