Delete performance (#1286)

* move expunge out

* move to prototype

* turns parameters into fields

* cleanup control flow

* moved transactions outside of loops

* add expungeeverything

* moved daos out

* add expungeThreadCount

* add expungeBatchSize

* added partition runner to run in separate threads

* all done.  just need to consolidate test code.

* Moar tests

* consolidated pointcutlatch into hapi-fhir-jpaserver-model

* final cleanup

* update javadoc

* change log

* failing test

* added delete
also @Transactional

* remove unused parameter

* fix compile

* race condition

(cherry picked from commit e1940d2fb20838859a205f2b96c833b0ce9f05eb)

# Conflicts:
#	hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/expunge/PartitionRunnerTest.java

* ja feedback

* ja feedback
This commit is contained in:
Ken Stevens 2019-04-25 15:38:37 -04:00 committed by GitHub
parent 9e9a1ed06d
commit 130e879d04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1100 additions and 491 deletions

View File

@ -5,10 +5,13 @@ import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.dao.data.*;
import ca.uhn.fhir.jpa.dao.expunge.ExpungeService;
import ca.uhn.fhir.jpa.dao.index.DaoSearchParamSynchronizer;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.dao.index.SearchParamWithInlineReferencesExtractor;
import ca.uhn.fhir.jpa.entity.*;
import ca.uhn.fhir.jpa.entity.ResourceSearchView;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
import ca.uhn.fhir.jpa.model.entity.*;
import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
@ -20,8 +23,6 @@ import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.term.IHapiTerminologySvc;
import ca.uhn.fhir.jpa.util.DeleteConflict;
import ca.uhn.fhir.jpa.util.ExpungeOptions;
import ca.uhn.fhir.jpa.util.ExpungeOutcome;
import ca.uhn.fhir.jpa.util.JpaConstants;
import ca.uhn.fhir.model.api.IResource;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
@ -41,10 +42,11 @@ import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.*;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails;
import ca.uhn.fhir.rest.server.interceptor.IServerOperationInterceptor;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.util.CoverageIgnore;
import ca.uhn.fhir.util.OperationOutcomeUtil;
@ -52,7 +54,6 @@ import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.XmlUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
@ -62,20 +63,16 @@ import org.hibernate.Session;
import org.hibernate.internal.SessionImpl;
import org.hl7.fhir.instance.model.api.*;
import org.hl7.fhir.r4.model.Bundle.HTTPVerb;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.SliceImpl;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import javax.persistence.*;
import javax.persistence.criteria.CriteriaBuilder;
@ -86,7 +83,6 @@ import javax.xml.stream.events.Characters;
import javax.xml.stream.events.XMLEvent;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.commons.lang3.StringUtils.*;
@ -120,7 +116,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
public static final String OO_SEVERITY_ERROR = "error";
public static final String OO_SEVERITY_INFO = "information";
public static final String OO_SEVERITY_WARN = "warning";
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseHapiFhirDao.class);
private static final Logger ourLog = LoggerFactory.getLogger(BaseHapiFhirDao.class);
private static final Map<FhirVersionEnum, FhirContext> ourRetrievalContexts = new HashMap<>();
private static final String PROCESSING_SUB_REQUEST = "BaseHapiFhirDao.processingSubRequest";
private static boolean ourValidationDisabledForUnitTest;
@ -133,54 +129,30 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
@Autowired
protected IForcedIdDao myForcedIdDao;
@Autowired
protected ISearchResultDao mySearchResultDao;
@Autowired(required = false)
protected IFulltextSearchSvc myFulltextSearchSvc;
@Autowired()
protected IResourceIndexedSearchParamUriDao myResourceIndexedSearchParamUriDao;
@Autowired()
protected IResourceIndexedSearchParamStringDao myResourceIndexedSearchParamStringDao;
@Autowired()
protected IResourceIndexedSearchParamTokenDao myResourceIndexedSearchParamTokenDao;
@Autowired
protected IResourceLinkDao myResourceLinkDao;
@Autowired()
protected IResourceIndexedSearchParamDateDao myResourceIndexedSearchParamDateDao;
@Autowired()
protected IResourceIndexedSearchParamQuantityDao myResourceIndexedSearchParamQuantityDao;
@Autowired()
protected IResourceIndexedSearchParamCoordsDao myResourceIndexedSearchParamCoordsDao;
@Autowired()
protected IResourceIndexedSearchParamNumberDao myResourceIndexedSearchParamNumberDao;
@Autowired
protected ISearchCoordinatorSvc mySearchCoordinatorSvc;
@Autowired
protected ISearchParamRegistry mySerarchParamRegistry;
@Autowired()
@Autowired
protected IHapiTerminologySvc myTerminologySvc;
@Autowired
protected IResourceHistoryTableDao myResourceHistoryTableDao;
@Autowired
protected IResourceHistoryTagDao myResourceHistoryTagDao;
@Autowired
protected IResourceTableDao myResourceTableDao;
@Autowired
protected IResourceTagDao myResourceTagDao;
@Autowired
protected IResourceSearchViewDao myResourceViewDao;
@Autowired
protected ISearchParamRegistry mySearchParamRegistry;
@Autowired(required = true)
@Autowired
private DaoConfig myConfig;
private FhirContext myContext;
@Autowired
private PlatformTransactionManager myPlatformTransactionManager;
@Autowired
private ISearchDao mySearchDao;
@Autowired
private ISearchParamPresenceSvc mySearchParamPresenceSvc;
//@Autowired
//private ISearchResultDao mySearchResultDao;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
@ -189,11 +161,31 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
private DaoSearchParamSynchronizer myDaoSearchParamSynchronizer;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@Autowired
ExpungeService myExpungeService;
private FhirContext myContext;
private ApplicationContext myApplicationContext;
@Autowired
protected IInterceptorBroadcaster myInterceptorBroadcaster;
@Autowired
public void setContext(FhirContext theContext) {
myContext = theContext;
}
@Override
public void setApplicationContext(ApplicationContext theApplicationContext) throws BeansException {
/*
* We do a null check here because Smile's module system tries to
* initialize the application context twice if two modules depend on
* the persistence module. The second time sets the dependency's appctx.
*/
if (myApplicationContext == null) {
myApplicationContext = theApplicationContext;
}
}
/**
* Returns the newly created forced ID. If the entity already had a forced ID, or if
* none was created, returns null.
@ -215,261 +207,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
return null;
}
protected ExpungeOutcome doExpunge(String theResourceName, Long theResourceId, Long theVersion, ExpungeOptions theExpungeOptions) {
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
ourLog.info("Expunge: ResourceName[{}] Id[{}] Version[{}] Options[{}]", theResourceName, theResourceId, theVersion, theExpungeOptions);
if (!getConfig().isExpungeEnabled()) {
throw new MethodNotAllowedException("$expunge is not enabled on this server");
}
if (theExpungeOptions.getLimit() < 1) {
throw new InvalidRequestException("Expunge limit may not be less than 1. Received expunge limit " + theExpungeOptions.getLimit() + ".");
}
AtomicInteger remainingCount = new AtomicInteger(theExpungeOptions.getLimit());
if (theResourceName == null && theResourceId == null && theVersion == null) {
if (theExpungeOptions.isExpungeEverything()) {
doExpungeEverything();
}
}
if (theExpungeOptions.isExpungeDeletedResources() && theVersion == null) {
/*
* Delete historical versions of deleted resources
*/
Pageable page = PageRequest.of(0, remainingCount.get());
Slice<Long> resourceIds = txTemplate.execute(t -> {
if (theResourceId != null) {
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResourcesOfType(page, theResourceId, theResourceName);
ourLog.info("Expunging {} deleted resources of type[{}] and ID[{}]", ids.getNumberOfElements(), theResourceName, theResourceId);
return ids;
} else {
if (theResourceName != null) {
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResourcesOfType(page, theResourceName);
ourLog.info("Expunging {} deleted resources of type[{}]", ids.getNumberOfElements(), theResourceName);
return ids;
} else {
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResources(page);
ourLog.info("Expunging {} deleted resources (all types)", ids.getNumberOfElements(), theResourceName);
return ids;
}
}
});
/*
* Delete any search result cache entries pointing to the given resource. We do
* this in batches to avoid sending giant batches of parameters to the DB
*/
List<List<Long>> partitions = Lists.partition(resourceIds.getContent(), 800);
for (List<Long> nextPartition : partitions) {
ourLog.info("Expunging any search results pointing to {} resources", nextPartition.size());
txTemplate.execute(t -> {
mySearchResultDao.deleteByResourceIds(nextPartition);
return null;
});
}
/*
* Delete historical versions
*/
for (Long next : resourceIds) {
txTemplate.execute(t -> {
expungeHistoricalVersionsOfId(next, remainingCount);
return null;
});
if (remainingCount.get() <= 0) {
ourLog.debug("Expunge limit has been hit - Stopping operation");
return toExpungeOutcome(theExpungeOptions, remainingCount);
}
}
/*
* Delete current versions of deleted resources
*/
for (Long next : resourceIds) {
txTemplate.execute(t -> {
expungeCurrentVersionOfResource(next, remainingCount);
return null;
});
if (remainingCount.get() <= 0) {
ourLog.debug("Expunge limit has been hit - Stopping operation");
return toExpungeOutcome(theExpungeOptions, remainingCount);
}
}
}
if (theExpungeOptions.isExpungeOldVersions()) {
/*
* Delete historical versions of non-deleted resources
*/
Pageable page = PageRequest.of(0, remainingCount.get());
Slice<Long> historicalIds = txTemplate.execute(t -> {
if (theResourceId != null) {
if (theVersion != null) {
return toSlice(myResourceHistoryTableDao.findForIdAndVersion(theResourceId, theVersion));
} else {
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResourceId(page, theResourceId);
}
} else {
if (theResourceName != null) {
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResources(page, theResourceName);
} else {
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResources(page);
}
}
});
for (Long next : historicalIds) {
txTemplate.execute(t -> {
expungeHistoricalVersion(next);
if (remainingCount.decrementAndGet() <= 0) {
return toExpungeOutcome(theExpungeOptions, remainingCount);
}
return null;
});
}
}
return toExpungeOutcome(theExpungeOptions, remainingCount);
}
private void doExpungeEverything() {
final AtomicInteger counter = new AtomicInteger();
ourLog.info("BEGINNING GLOBAL $expunge");
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(t -> {
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + ResourceHistoryTable.class.getSimpleName() + " d SET d.myForcedId = null"));
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + ResourceTable.class.getSimpleName() + " d SET d.myForcedId = null"));
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + TermCodeSystem.class.getSimpleName() + " d SET d.myCurrentVersion = null"));
return null;
});
txTemplate.execute(t -> {
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchParamPresent.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ForcedId.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamDate.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamNumber.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamQuantity.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamString.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamToken.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamUri.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamCoords.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedCompositeStringUnique.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceLink.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchResult.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchInclude.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptParentChildLink.class.getSimpleName() + " d"));
return null;
});
txTemplate.execute(t -> {
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroupElementTarget.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroupElement.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroup.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMap.class.getSimpleName() + " d"));
return null;
});
txTemplate.execute(t -> {
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptProperty.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptDesignation.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConcept.class.getSimpleName() + " d"));
for (TermCodeSystem next : myEntityManager.createQuery("SELECT c FROM " + TermCodeSystem.class.getName() + " c", TermCodeSystem.class).getResultList()) {
next.setCurrentVersion(null);
myEntityManager.merge(next);
}
return null;
});
txTemplate.execute(t -> {
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermCodeSystemVersion.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermCodeSystem.class.getSimpleName() + " d"));
return null;
});
txTemplate.execute(t -> {
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SubscriptionTable.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceHistoryTag.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceTag.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TagDefinition.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceHistoryTable.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceTable.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + org.hibernate.search.jpa.Search.class.getSimpleName() + " d"));
return null;
});
ourLog.info("COMPLETED GLOBAL $expunge - Deleted {} rows", counter.get());
}
private int doExpungeEverythingQuery(String theQuery) {
StopWatch sw = new StopWatch();
int outcome = myEntityManager.createQuery(theQuery).executeUpdate();
ourLog.debug("Query affected {} rows in {}: {}", outcome, sw.toString(), theQuery);
return outcome;
}
private void expungeCurrentVersionOfResource(Long theResourceId, AtomicInteger theRemainingCount) {
ResourceTable resource = myResourceTableDao.findById(theResourceId).orElseThrow(IllegalStateException::new);
ResourceHistoryTable currentVersion = myResourceHistoryTableDao.findForIdAndVersion(resource.getId(), resource.getVersion());
if (currentVersion != null) {
expungeHistoricalVersion(currentVersion.getId());
}
ourLog.info("Expunging current version of resource {}", resource.getIdDt().getValue());
myResourceIndexedSearchParamUriDao.deleteAll(resource.getParamsUri());
myResourceIndexedSearchParamCoordsDao.deleteAll(resource.getParamsCoords());
myResourceIndexedSearchParamDateDao.deleteAll(resource.getParamsDate());
myResourceIndexedSearchParamNumberDao.deleteAll(resource.getParamsNumber());
myResourceIndexedSearchParamQuantityDao.deleteAll(resource.getParamsQuantity());
myResourceIndexedSearchParamStringDao.deleteAll(resource.getParamsString());
myResourceIndexedSearchParamTokenDao.deleteAll(resource.getParamsToken());
myResourceLinkDao.deleteAll(resource.getResourceLinks());
myResourceLinkDao.deleteAll(resource.getResourceLinksAsTarget());
myResourceTagDao.deleteAll(resource.getTags());
resource.getTags().clear();
if (resource.getForcedId() != null) {
ForcedId forcedId = resource.getForcedId();
resource.setForcedId(null);
myResourceTableDao.saveAndFlush(resource);
myIdHelperService.delete(forcedId);
}
myResourceTableDao.delete(resource);
theRemainingCount.decrementAndGet();
}
protected void expungeHistoricalVersion(Long theNextVersionId) {
ResourceHistoryTable version = myResourceHistoryTableDao.findById(theNextVersionId).orElseThrow(IllegalArgumentException::new);
ourLog.info("Deleting resource version {}", version.getIdDt().getValue());
myResourceHistoryTagDao.deleteAll(version.getTags());
myResourceHistoryTableDao.delete(version);
}
protected void expungeHistoricalVersionsOfId(Long theResourceId, AtomicInteger theRemainingCount) {
ResourceTable resource = myResourceTableDao.findById(theResourceId).orElseThrow(IllegalArgumentException::new);
Pageable page = PageRequest.of(0, theRemainingCount.get());
Slice<Long> versionIds = myResourceHistoryTableDao.findForResourceId(page, resource.getId(), resource.getVersion());
ourLog.debug("Found {} versions of resource {} to expunge", versionIds.getNumberOfElements(), resource.getIdDt().getValue());
for (Long nextVersionId : versionIds) {
expungeHistoricalVersion(nextVersionId);
if (theRemainingCount.decrementAndGet() <= 0) {
return;
}
}
}
private void extractTagsHapi(IResource theResource, ResourceTable theEntity, Set<ResourceTag> allDefs) {
TagList tagList = ResourceMetadataKeyEnum.TAG_LIST.get(theResource);
if (tagList != null) {
@ -602,11 +339,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
return myContext;
}
@Autowired
public void setContext(FhirContext theContext) {
myContext = theContext;
}
public FhirContext getContext(FhirVersionEnum theVersion) {
Validate.notNull(theVersion, "theVersion must not be null");
synchronized (ourRetrievalContexts) {
@ -1055,18 +787,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
throw new NotImplementedException("");
}
@Override
public void setApplicationContext(ApplicationContext theApplicationContext) throws BeansException {
/*
* We do a null check here because Smile's module system tries to
* initialize the application context twice if two modules depend on
* the persistence module. The second time sets the dependency's appctx.
*/
if (myApplicationContext == null) {
myApplicationContext = theApplicationContext;
}
}
/**
* This method is called when an update to an existing resource detects that the resource supplied for update is missing a tag/profile/security label that the currently persisted resource holds.
* <p>
@ -1117,11 +837,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
return false;
}
private ExpungeOutcome toExpungeOutcome(ExpungeOptions theExpungeOptions, AtomicInteger theRemainingCount) {
return new ExpungeOutcome()
.setDeletedCount(theExpungeOptions.getLimit() - theRemainingCount.get());
}
@Override
public IBaseResource toResource(BaseHasResource theEntity, boolean theForHistoryOperation) {
RuntimeResourceDefinition type = myContext.getResourceDefinition(theEntity.getResourceType());
@ -1248,9 +963,9 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
return myContext.getResourceDefinition(theResource).getName();
}
private Slice<Long> toSlice(ResourceHistoryTable theVersion) {
Validate.notNull(theVersion);
return new SliceImpl<>(Collections.singletonList(theVersion.getId()));
protected ResourceTable updateEntityForDelete(RequestDetails theRequest, ResourceTable entity) {
Date updateTime = new Date();
return updateEntity(theRequest, null, entity, updateTime, true, true, updateTime, false, true);
}
@SuppressWarnings("unchecked")
@ -1284,14 +999,13 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
theEntity.setPublished(theUpdateTime);
}
ResourceIndexedSearchParams existingParams = new ResourceIndexedSearchParams(theEntity);
ResourceIndexedSearchParams existingParams = null;
ResourceIndexedSearchParams newParams = null;
EncodedResource changed;
if (theDeletedTimestampOrNull != null) {
newParams = new ResourceIndexedSearchParams();
// DELETE
theEntity.setDeleted(theDeletedTimestampOrNull);
theEntity.setUpdated(theDeletedTimestampOrNull);
@ -1302,7 +1016,8 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
changed = populateResourceIntoEntity(theRequest, theResource, theEntity, true);
} else {
// CREATE or UPDATE
existingParams = new ResourceIndexedSearchParams(theEntity);
theEntity.setDeleted(null);
if (thePerformIndexing) {
@ -1392,7 +1107,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
* index table for resource links (reference indexes) because we index
* those by path and not by parameter name.
*/
if (thePerformIndexing) {
if (thePerformIndexing && newParams != null) {
Map<String, Boolean> presentSearchParams = new HashMap<>();
for (String nextKey : newParams.getPopulatedResourceLinkParameters()) {
presentSearchParams.put(nextKey, Boolean.TRUE);
@ -1412,8 +1127,12 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
* Indexing
*/
if (thePerformIndexing) {
myDaoSearchParamSynchronizer.synchronizeSearchParamsToDatabase(newParams, theEntity, existingParams);
mySearchParamWithInlineReferencesExtractor.storeCompositeStringUniques(newParams, theEntity, existingParams);
if (newParams == null) {
myExpungeService.deleteAllSearchParams(theEntity.getId());
} else {
myDaoSearchParamSynchronizer.synchronizeSearchParamsToDatabase(newParams, theEntity, existingParams);
mySearchParamWithInlineReferencesExtractor.storeCompositeStringUniques(newParams, theEntity, existingParams);
}
}
if (theResource != null) {
@ -1424,11 +1143,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
return theEntity;
}
protected ResourceTable updateEntity(RequestDetails theRequest, IBaseResource theResource, ResourceTable
entity, Date theDeletedTimestampOrNull, Date theUpdateTime) {
return updateEntity(theRequest, theResource, entity, theDeletedTimestampOrNull, true, true, theUpdateTime, false, true);
}
public ResourceTable updateInternal(RequestDetails theRequestDetails, T theResource, boolean thePerformIndexing, boolean theForceUpdateVersion,
ResourceTable theEntity, IIdType theResourceId, IBaseResource theOldResource) {

View File

@ -90,12 +90,12 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
protected DaoConfig myDaoConfig;
@Autowired
private MatchResourceUrlService myMatchResourceUrlService;
@Autowired
private IResourceReindexingSvc myResourceReindexingSvc;
private String myResourceName;
private Class<T> myResourceType;
private String mySecondaryPrimaryKeyParamName;
@Autowired
private IResourceReindexingSvc myResourceReindexingSvc;
@Override
public void addTag(IIdType theId, TagTypeEnum theTagType, String theScheme, String theTerm, String theLabel) {
@ -236,8 +236,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
notifyInterceptors(RestOperationTypeEnum.DELETE, requestDetails);
}
Date updateTime = new Date();
ResourceTable savedEntity = updateEntity(theRequest, null, entity, updateTime, updateTime);
ResourceTable savedEntity = updateEntityForDelete(theRequest, entity);
resourceToDelete.setId(entity.getIdDt());
// Notify JPA interceptors
@ -285,15 +284,15 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
public DeleteMethodOutcome deleteByUrl(String theUrl, List<DeleteConflict> deleteConflicts, RequestDetails theRequest) {
StopWatch w = new StopWatch();
Set<Long> resource = myMatchResourceUrlService.processMatchUrl(theUrl, myResourceType);
if (resource.size() > 1) {
Set<Long> resourceIds = myMatchResourceUrlService.processMatchUrl(theUrl, myResourceType);
if (resourceIds.size() > 1) {
if (myDaoConfig.isAllowMultipleDelete() == false) {
throw new PreconditionFailedException(getContext().getLocalizer().getMessage(BaseHapiFhirDao.class, "transactionOperationWithMultipleMatchFailure", "DELETE", theUrl, resource.size()));
throw new PreconditionFailedException(getContext().getLocalizer().getMessage(BaseHapiFhirDao.class, "transactionOperationWithMultipleMatchFailure", "DELETE", theUrl, resourceIds.size()));
}
}
List<ResourceTable> deletedResources = new ArrayList<ResourceTable>();
for (Long pid : resource) {
List<ResourceTable> deletedResources = new ArrayList<>();
for (Long pid : resourceIds) {
ResourceTable entity = myEntityManager.find(ResourceTable.class, pid);
deletedResources.add(entity);
@ -316,8 +315,8 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
// Perform delete
Date updateTime = new Date();
updateEntity(theRequest, null, entity, updateTime, updateTime);
updateEntityForDelete(theRequest, entity);
resourceToDelete.setId(entity.getIdDt());
// Notify JPA interceptors
@ -558,10 +557,10 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
throw new PreconditionFailedException("Can not perform version-specific expunge of resource " + theId.toUnqualified().getValue() + " as this is the current version");
}
return doExpunge(getResourceName(), entity.getResourceId(), entity.getVersion(), theExpungeOptions);
return myExpungeService.expunge(getResourceName(), entity.getResourceId(), entity.getVersion(), theExpungeOptions);
}
return doExpunge(getResourceName(), entity.getResourceId(), null, theExpungeOptions);
return myExpungeService.expunge(getResourceName(), entity.getResourceId(), null, theExpungeOptions);
}
@Override
@ -569,7 +568,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
public ExpungeOutcome expunge(ExpungeOptions theExpungeOptions) {
ourLog.info("Beginning TYPE[{}] expunge operation", getResourceName());
return doExpunge(getResourceName(), null, null, theExpungeOptions);
return myExpungeService.expunge(getResourceName(), null, null, theExpungeOptions);
}
@Override

View File

@ -1,7 +1,5 @@
package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
import ca.uhn.fhir.jpa.util.ExpungeOptions;
import ca.uhn.fhir.jpa.util.ExpungeOutcome;
import ca.uhn.fhir.jpa.util.ResourceCountCache;
@ -13,7 +11,6 @@ import ca.uhn.fhir.util.StopWatch;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@ -22,7 +19,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
/*
* #%L
@ -54,7 +50,7 @@ public abstract class BaseHapiFhirSystemDao<T, MT> extends BaseHapiFhirDao<IBase
@Override
@Transactional(propagation = Propagation.NEVER)
public ExpungeOutcome expunge(ExpungeOptions theExpungeOptions) {
return doExpunge(null, null, null, theExpungeOptions);
return myExpungeService.expunge(null, null, null, theExpungeOptions);
}
@Transactional(propagation = Propagation.REQUIRED)

View File

@ -73,6 +73,7 @@ public class DaoConfig {
Bundle.BundleType.MESSAGE.toCode()
)));
private static final Logger ourLog = LoggerFactory.getLogger(DaoConfig.class);
private static final int DEFAULT_EXPUNGE_BATCH_SIZE = 800;
private IndexEnabledEnum myIndexMissingFieldsEnabled = IndexEnabledEnum.DISABLED;
/**
@ -134,7 +135,9 @@ public class DaoConfig {
private IdStrategyEnum myResourceServerIdStrategy = IdStrategyEnum.SEQUENTIAL_NUMERIC;
private boolean myMarkResourcesForReindexingUponSearchParameterChange;
private boolean myExpungeEnabled;
private int myExpungeBatchSize = DEFAULT_EXPUNGE_BATCH_SIZE;
private int myReindexThreadCount;
private int myExpungeThreadCount;
private Set<String> myBundleTypesAllowedForStorage;
private boolean myValidateSearchParameterExpressionsOnSave = true;
private List<Integer> mySearchPreFetchThresholds = Arrays.asList(500, 2000, -1);
@ -153,6 +156,7 @@ public class DaoConfig {
setSubscriptionPurgeInactiveAfterMillis(Long.MAX_VALUE);
setMarkResourcesForReindexingUponSearchParameterChange(true);
setReindexThreadCount(Runtime.getRuntime().availableProcessors());
setExpungeThreadCount(Runtime.getRuntime().availableProcessors());
setBundleTypesAllowedForStorage(DEFAULT_BUNDLE_TYPES_ALLOWED_FOR_STORAGE);
if ("true".equalsIgnoreCase(System.getProperty(DISABLE_STATUS_BASED_REINDEX))) {
@ -601,6 +605,31 @@ public class DaoConfig {
myReindexThreadCount = Math.max(myReindexThreadCount, 1); // Minimum of 1
}
/**
* This setting controls the number of threads allocated to the expunge operation
* <p>
* The default value is set to the number of available processors
* (via <code>Runtime.getRuntime().availableProcessors()</code>). Value
* for this setting must be a positive integer.
* </p>
*/
public int getExpungeThreadCount() {
return myExpungeThreadCount;
}
/**
* This setting controls the number of threads allocated to the expunge operation
* <p>
* The default value is set to the number of available processors
* (via <code>Runtime.getRuntime().availableProcessors()</code>). Value
* for this setting must be a positive integer.
* </p>
*/
public void setExpungeThreadCount(int theExpungeThreadCount) {
myExpungeThreadCount = theExpungeThreadCount;
myExpungeThreadCount = Math.max(myExpungeThreadCount, 1); // Minimum of 1
}
public ResourceEncodingEnum getResourceEncoding() {
return myResourceEncoding;
}
@ -1048,6 +1077,22 @@ public class DaoConfig {
myExpungeEnabled = theExpungeEnabled;
}
/**
* The expunge batch size (default 800) determines the number of records deleted within a single transaction by the
* expunge operation.
*/
public void setExpungeBatchSize(int theExpungeBatchSize) {
myExpungeBatchSize = theExpungeBatchSize;
}
/**
* The expunge batch size (default 800) determines the number of records deleted within a single transaction by the
* expunge operation.
*/
public int getExpungeBatchSize() {
return myExpungeBatchSize;
}
/**
* Should contained IDs be indexed the same way that non-contained IDs are (default is
* <code>true</code>)

View File

@ -23,7 +23,12 @@ package ca.uhn.fhir.jpa.dao.data;
import org.springframework.data.jpa.repository.JpaRepository;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamCoords;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
public interface IResourceIndexedSearchParamCoordsDao extends JpaRepository<ResourceIndexedSearchParamCoords, Long> {
// nothing yet
@Modifying
@Query("delete from ResourceIndexedSearchParamCoords t WHERE t.myResourcePid = :resid")
void deleteByResourceId(@Param("resid") Long theResourcePid);
}

View File

@ -23,7 +23,12 @@ package ca.uhn.fhir.jpa.dao.data;
import org.springframework.data.jpa.repository.JpaRepository;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamDate;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
public interface IResourceIndexedSearchParamDateDao extends JpaRepository<ResourceIndexedSearchParamDate, Long> {
// nothing yet
@Modifying
@Query("delete from ResourceIndexedSearchParamDate t WHERE t.myResourcePid = :resid")
void deleteByResourceId(@Param("resid") Long theResourcePid);
}

View File

@ -23,7 +23,12 @@ package ca.uhn.fhir.jpa.dao.data;
import org.springframework.data.jpa.repository.JpaRepository;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamNumber;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
public interface IResourceIndexedSearchParamNumberDao extends JpaRepository<ResourceIndexedSearchParamNumber, Long> {
// nothing yet
@Modifying
@Query("delete from ResourceIndexedSearchParamNumber t WHERE t.myResourcePid = :resid")
void deleteByResourceId(@Param("resid") Long theResourcePid);
}

View File

@ -23,7 +23,12 @@ package ca.uhn.fhir.jpa.dao.data;
import org.springframework.data.jpa.repository.JpaRepository;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamQuantity;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
public interface IResourceIndexedSearchParamQuantityDao extends JpaRepository<ResourceIndexedSearchParamQuantity, Long> {
// nothing yet
@Modifying
@Query("delete from ResourceIndexedSearchParamQuantity t WHERE t.myResourcePid = :resid")
void deleteByResourceId(@Param("resid") Long theResourcePid);
}

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.jpa.dao.data;
import org.springframework.data.jpa.repository.JpaRepository;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamString;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
@ -31,4 +32,7 @@ public interface IResourceIndexedSearchParamStringDao extends JpaRepository<Reso
@Query("select count(*) from ResourceIndexedSearchParamString t WHERE t.myResourcePid = :resid")
int countForResourceId(@Param("resid") Long theResourcePid);
@Modifying
@Query("delete from ResourceIndexedSearchParamString t WHERE t.myResourcePid = :resid")
void deleteByResourceId(@Param("resid") Long theResourcePid);
}

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamToken;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
@ -30,4 +31,7 @@ public interface IResourceIndexedSearchParamTokenDao extends JpaRepository<Resou
@Query("select count(*) from ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :resid")
int countForResourceId(@Param("resid") Long theResourcePid);
@Modifying
@Query("delete from ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :resid")
void deleteByResourceId(@Param("resid") Long theResourcePid);
}

View File

@ -23,6 +23,7 @@ import java.util.Collection;
*/
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
@ -33,4 +34,7 @@ public interface IResourceIndexedSearchParamUriDao extends JpaRepository<Resourc
@Query("SELECT DISTINCT p.myUri FROM ResourceIndexedSearchParamUri p WHERE p.myResourceType = :resource_type AND p.myParamName = :param_name")
public Collection<String> findAllByResourceTypeAndParamName(@Param("resource_type") String theResourceType, @Param("param_name") String theParamName);
@Modifying
@Query("delete from ResourceIndexedSearchParamUri t WHERE t.myResourcePid = :resid")
void deleteByResourceId(@Param("resid") Long theResourcePid);
}

View File

@ -23,9 +23,13 @@ package ca.uhn.fhir.jpa.dao.data;
import org.springframework.data.jpa.repository.JpaRepository;
import ca.uhn.fhir.jpa.model.entity.ResourceLink;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
public interface IResourceLinkDao extends JpaRepository<ResourceLink, Long> {
@Modifying
@Query("delete from ResourceLink t WHERE t.mySourceResourcePid = :resid")
void deleteByResourceId(@Param("resid") Long theResourcePid);
}

View File

@ -23,6 +23,7 @@ import java.util.Collection;
*/
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
@ -34,4 +35,7 @@ public interface IResourceTagDao extends JpaRepository<ResourceTag, Long> {
"INNER JOIN TagDefinition td ON (td.myId = t.myTagId) " +
"WHERE t.myResourceId in (:pids)")
Collection<ResourceTag> findByResourceIds(@Param("pids") Collection<Long> pids);
}
@Modifying
@Query("delete from ResourceTag t WHERE t.myResourceId = :resid")
void deleteByResourceId(@Param("resid") Long theResourcePid);}

View File

@ -0,0 +1,99 @@
package ca.uhn.fhir.jpa.dao.expunge;
import ca.uhn.fhir.jpa.entity.*;
import ca.uhn.fhir.jpa.model.entity.*;
import ca.uhn.fhir.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.PersistenceContextType;
import java.util.concurrent.atomic.AtomicInteger;
@Service
public class ExpungeEverythingService {
private static final Logger ourLog = LoggerFactory.getLogger(ExpungeEverythingService.class);
@Autowired
private PlatformTransactionManager myPlatformTransactionManager;
@PersistenceContext(type = PersistenceContextType.TRANSACTION)
protected EntityManager myEntityManager;
void expungeEverything() {
final AtomicInteger counter = new AtomicInteger();
ourLog.info("BEGINNING GLOBAL $expunge");
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(t -> {
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + ResourceHistoryTable.class.getSimpleName() + " d SET d.myForcedId = null"));
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + ResourceTable.class.getSimpleName() + " d SET d.myForcedId = null"));
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + TermCodeSystem.class.getSimpleName() + " d SET d.myCurrentVersion = null"));
return null;
});
txTemplate.execute(t -> {
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchParamPresent.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ForcedId.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamDate.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamNumber.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamQuantity.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamString.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamToken.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamUri.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamCoords.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedCompositeStringUnique.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceLink.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchResult.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchInclude.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptParentChildLink.class.getSimpleName() + " d"));
return null;
});
txTemplate.execute(t -> {
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroupElementTarget.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroupElement.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroup.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMap.class.getSimpleName() + " d"));
return null;
});
txTemplate.execute(t -> {
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptProperty.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptDesignation.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConcept.class.getSimpleName() + " d"));
for (TermCodeSystem next : myEntityManager.createQuery("SELECT c FROM " + TermCodeSystem.class.getName() + " c", TermCodeSystem.class).getResultList()) {
next.setCurrentVersion(null);
myEntityManager.merge(next);
}
return null;
});
txTemplate.execute(t -> {
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermCodeSystemVersion.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermCodeSystem.class.getSimpleName() + " d"));
return null;
});
txTemplate.execute(t -> {
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SubscriptionTable.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceHistoryTag.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceTag.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TagDefinition.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceHistoryTable.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceTable.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + org.hibernate.search.jpa.Search.class.getSimpleName() + " d"));
return null;
});
ourLog.info("COMPLETED GLOBAL $expunge - Deleted {} rows", counter.get());
}
private int doExpungeEverythingQuery(String theQuery) {
StopWatch sw = new StopWatch();
int outcome = myEntityManager.createQuery(theQuery).executeUpdate();
ourLog.debug("Query affected {} rows in {}: {}", outcome, sw.toString(), theQuery);
return outcome;
}
}

View File

@ -0,0 +1,111 @@
package ca.uhn.fhir.jpa.dao.expunge;
import ca.uhn.fhir.jpa.util.ExpungeOptions;
import ca.uhn.fhir.jpa.util.ExpungeOutcome;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.data.domain.Slice;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@Scope("prototype")
public class ExpungeRun implements Callable<ExpungeOutcome> {
private static final Logger ourLog = LoggerFactory.getLogger(ExpungeService.class);
@Autowired
private PlatformTransactionManager myPlatformTransactionManager;
@Autowired
private IResourceExpungeService myExpungeDaoService;
@Autowired
private PartitionRunner myPartitionRunner;
private final String myResourceName;
private final Long myResourceId;
private final Long myVersion;
private final ExpungeOptions myExpungeOptions;
private final AtomicInteger myRemainingCount;
public ExpungeRun(String theResourceName, Long theResourceId, Long theVersion, ExpungeOptions theExpungeOptions) {
myResourceName = theResourceName;
myResourceId = theResourceId;
myVersion = theVersion;
myExpungeOptions = theExpungeOptions;
myRemainingCount = new AtomicInteger(myExpungeOptions.getLimit());
}
@Override
public ExpungeOutcome call() {
if (myExpungeOptions.isExpungeDeletedResources() && myVersion == null) {
expungeDeletedResources();
if (expungeLimitReached()) {
return expungeOutcome();
}
}
if (myExpungeOptions.isExpungeOldVersions()) {
expungeOldVersions();
if (expungeLimitReached()) {
return expungeOutcome();
}
}
return expungeOutcome();
}
private void expungeDeletedResources() {
Slice<Long> resourceIds = findHistoricalVersionsOfDeletedResources();
deleteSearchResultCacheEntries(resourceIds);
deleteHistoricalVersions(resourceIds);
if (expungeLimitReached()) {
return;
}
deleteCurrentVersionsOfDeletedResources(resourceIds);
}
private Slice<Long> findHistoricalVersionsOfDeletedResources() {
return myExpungeDaoService.findHistoricalVersionsOfDeletedResources(myResourceName, myResourceId, myRemainingCount.get());
}
private Slice<Long> findHistoricalVersionsOfNonDeletedResources() {
return myExpungeDaoService.findHistoricalVersionsOfNonDeletedResources(myResourceName, myResourceId, myVersion, myRemainingCount.get());
}
private boolean expungeLimitReached() {
boolean expungeLimitReached = myRemainingCount.get() <= 0;
if (expungeLimitReached) {
ourLog.debug("Expunge limit has been hit - Stopping operation");
}
return expungeLimitReached;
}
private void expungeOldVersions() {
Slice<Long> historicalIds = findHistoricalVersionsOfNonDeletedResources();
myPartitionRunner.runInPartitionedThreads(historicalIds, partition -> myExpungeDaoService.expungeHistoricalVersions(partition, myRemainingCount));
}
private void deleteCurrentVersionsOfDeletedResources(Slice<Long> theResourceIds) {
myPartitionRunner.runInPartitionedThreads(theResourceIds, partition -> myExpungeDaoService.expungeCurrentVersionOfResources(partition, myRemainingCount));
}
private void deleteHistoricalVersions(Slice<Long> theResourceIds) {
myPartitionRunner.runInPartitionedThreads(theResourceIds, partition -> myExpungeDaoService.expungeHistoricalVersionsOfIds(partition, myRemainingCount));
}
private void deleteSearchResultCacheEntries(Slice<Long> theResourceIds) {
myPartitionRunner.runInPartitionedThreads(theResourceIds, partition -> myExpungeDaoService.deleteByResourceIdPartitions(partition));
}
private ExpungeOutcome expungeOutcome() {
return new ExpungeOutcome().setDeletedCount(myExpungeOptions.getLimit() - myRemainingCount.get());
}
}

View File

@ -0,0 +1,52 @@
package ca.uhn.fhir.jpa.dao.expunge;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.util.ExpungeOptions;
import ca.uhn.fhir.jpa.util.ExpungeOutcome;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.MethodNotAllowedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Lookup;
import org.springframework.stereotype.Service;
@Service
public abstract class ExpungeService {
private static final Logger ourLog = LoggerFactory.getLogger(ExpungeService.class);
@Autowired
private DaoConfig myConfig;
@Autowired
private ExpungeEverythingService myExpungeEverythingService;
@Autowired
private IResourceExpungeService myExpungeDaoService;
@Lookup
protected abstract ExpungeRun getExpungeRun(String theResourceName, Long theResourceId, Long theVersion, ExpungeOptions theExpungeOptions);
public ExpungeOutcome expunge(String theResourceName, Long theResourceId, Long theVersion, ExpungeOptions theExpungeOptions) {
ourLog.info("Expunge: ResourceName[{}] Id[{}] Version[{}] Options[{}]", theResourceName, theResourceId, theVersion, theExpungeOptions);
if (!myConfig.isExpungeEnabled()) {
throw new MethodNotAllowedException("$expunge is not enabled on this server");
}
if (theExpungeOptions.getLimit() < 1) {
throw new InvalidRequestException("Expunge limit may not be less than 1. Received expunge limit " + theExpungeOptions.getLimit() + ".");
}
if (theResourceName == null && theResourceId == null && theVersion == null) {
if (theExpungeOptions.isExpungeEverything()) {
myExpungeEverythingService.expungeEverything();
}
}
ExpungeRun expungeRun = getExpungeRun(theResourceName, theResourceId, theVersion, theExpungeOptions);
return expungeRun.call();
}
public void deleteAllSearchParams(Long theResourceId) {
myExpungeDaoService.deleteAllSearchParams(theResourceId);
}
}

View File

@ -0,0 +1,22 @@
package ca.uhn.fhir.jpa.dao.expunge;
import org.springframework.data.domain.Slice;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public interface IResourceExpungeService {
Slice<Long> findHistoricalVersionsOfDeletedResources(String theResourceName, Long theResourceId, int theI);
Slice<Long> findHistoricalVersionsOfNonDeletedResources(String theResourceName, Long theResourceId, Long theVersion, int theI);
void expungeHistoricalVersions(List<Long> thePartition, AtomicInteger theRemainingCount);
void expungeCurrentVersionOfResources(List<Long> thePartition, AtomicInteger theRemainingCount);
void expungeHistoricalVersionsOfIds(List<Long> thePartition, AtomicInteger theRemainingCount);
void deleteByResourceIdPartitions(List<Long> thePartition);
void deleteAllSearchParams(Long theResourceId);
}

View File

@ -0,0 +1,105 @@
package ca.uhn.fhir.jpa.dao.expunge;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.StopWatch;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Slice;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
@Service
public class PartitionRunner {
private static final Logger ourLog = LoggerFactory.getLogger(ExpungeService.class);
private static final int MAX_POOL_SIZE = 1000;
private final DaoConfig myDaoConfig;
@Autowired
public PartitionRunner(DaoConfig theDaoConfig) {
myDaoConfig = theDaoConfig;
}
void runInPartitionedThreads(Slice<Long> theResourceIds, Consumer<List<Long>> partitionConsumer) {
List<Callable<Void>> callableTasks = buildCallableTasks(theResourceIds, partitionConsumer);
if (callableTasks.size() == 0) {
return;
}
ExecutorService executorService = buildExecutor(callableTasks.size());
try {
List<Future<Void>> futures = executorService.invokeAll(callableTasks);
// wait for all the threads to finish
for (Future<Void> future : futures) {
future.get();
}
} catch (InterruptedException e) {
ourLog.error("Interrupted while expunging.", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
ourLog.error("Error while expunging.", e);
throw new InternalErrorException(e);
} finally {
executorService.shutdown();
}
}
private List<Callable<Void>> buildCallableTasks(Slice<Long> theResourceIds, Consumer<List<Long>> partitionConsumer) {
List<Callable<Void>> retval = new ArrayList<>();
List<List<Long>> partitions = Lists.partition(theResourceIds.getContent(), myDaoConfig.getExpungeBatchSize());
for (List<Long> nextPartition : partitions) {
Callable<Void> callableTask = () -> {
ourLog.info("Expunging any search results pointing to {} resources", nextPartition.size());
partitionConsumer.accept(nextPartition);
return null;
};
retval.add(callableTask);
}
return retval;
}
private ExecutorService buildExecutor(int numberOfTasks) {
int threadCount = Math.min(numberOfTasks, myDaoConfig.getExpungeThreadCount());
assert (threadCount > 0);
ourLog.info("Expunging with {} threads", threadCount);
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(MAX_POOL_SIZE);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("expunge-%d")
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
ourLog.info("Note: Expunge executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
StopWatch sw = new StopWatch();
try {
executorQueue.put(theRunnable);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + e.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
};
return new ThreadPoolExecutor(
threadCount,
MAX_POOL_SIZE,
0L,
TimeUnit.MILLISECONDS,
executorQueue,
threadFactory,
rejectedExecutionHandler);
}
}

View File

@ -0,0 +1,202 @@
package ca.uhn.fhir.jpa.dao.expunge;
import ca.uhn.fhir.jpa.dao.data.*;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.model.entity.ForcedId;
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.SliceImpl;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@Service
class ResourceExpungeService implements IResourceExpungeService {
private static final Logger ourLog = LoggerFactory.getLogger(ResourceExpungeService.class);
@Autowired
private IResourceTableDao myResourceTableDao;
@Autowired
private ISearchResultDao mySearchResultDao;
@Autowired
private IResourceHistoryTableDao myResourceHistoryTableDao;
@Autowired
private IResourceIndexedSearchParamUriDao myResourceIndexedSearchParamUriDao;
@Autowired
private IResourceIndexedSearchParamStringDao myResourceIndexedSearchParamStringDao;
@Autowired
private IResourceIndexedSearchParamTokenDao myResourceIndexedSearchParamTokenDao;
@Autowired
private IResourceIndexedSearchParamDateDao myResourceIndexedSearchParamDateDao;
@Autowired
private IResourceIndexedSearchParamQuantityDao myResourceIndexedSearchParamQuantityDao;
@Autowired
private IResourceIndexedSearchParamCoordsDao myResourceIndexedSearchParamCoordsDao;
@Autowired
private IResourceIndexedSearchParamNumberDao myResourceIndexedSearchParamNumberDao;
@Autowired
private IResourceLinkDao myResourceLinkDao;
@Autowired
private IResourceTagDao myResourceTagDao;
@Autowired
private IdHelperService myIdHelperService;
@Autowired
private IResourceHistoryTagDao myResourceHistoryTagDao;
@Override
@Transactional
public Slice<Long> findHistoricalVersionsOfNonDeletedResources(String theResourceName, Long theResourceId, Long theVersion, int theRemainingCount) {
Pageable page = PageRequest.of(0, theRemainingCount);
if (theResourceId != null) {
if (theVersion != null) {
return toSlice(myResourceHistoryTableDao.findForIdAndVersion(theResourceId, theVersion));
} else {
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResourceId(page, theResourceId);
}
} else {
if (theResourceName != null) {
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResources(page, theResourceName);
} else {
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResources(page);
}
}
}
@Override
@Transactional
public Slice<Long> findHistoricalVersionsOfDeletedResources(String theResourceName, Long theResourceId, int theRemainingCount) {
Pageable page = PageRequest.of(0, theRemainingCount);
if (theResourceId != null) {
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResourcesOfType(page, theResourceId, theResourceName);
ourLog.info("Expunging {} deleted resources of type[{}] and ID[{}]", ids.getNumberOfElements(), theResourceName, theResourceId);
return ids;
} else {
if (theResourceName != null) {
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResourcesOfType(page, theResourceName);
ourLog.info("Expunging {} deleted resources of type[{}]", ids.getNumberOfElements(), theResourceName);
return ids;
} else {
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResources(page);
ourLog.info("Expunging {} deleted resources (all types)", ids.getNumberOfElements());
return ids;
}
}
}
@Override
@Transactional
public void expungeCurrentVersionOfResources(List<Long> theResourceIds, AtomicInteger theRemainingCount) {
for (Long next : theResourceIds) {
expungeCurrentVersionOfResource(next, theRemainingCount);
if (theRemainingCount.get() <= 0) {
return;
}
}
}
private void expungeHistoricalVersion(Long theNextVersionId) {
ResourceHistoryTable version = myResourceHistoryTableDao.findById(theNextVersionId).orElseThrow(IllegalArgumentException::new);
ourLog.info("Deleting resource version {}", version.getIdDt().getValue());
myResourceHistoryTagDao.deleteAll(version.getTags());
myResourceHistoryTableDao.delete(version);
}
@Override
@Transactional
public void expungeHistoricalVersionsOfIds(List<Long> theResourceIds, AtomicInteger theRemainingCount) {
for (Long next : theResourceIds) {
expungeHistoricalVersionsOfId(next, theRemainingCount);
if (theRemainingCount.get() <= 0) {
return;
}
}
}
@Override
@Transactional
public void expungeHistoricalVersions(List<Long> theHistoricalIds, AtomicInteger theRemainingCount) {
for (Long next : theHistoricalIds) {
expungeHistoricalVersion(next);
if (theRemainingCount.decrementAndGet() <= 0) {
return;
}
}
}
private void expungeCurrentVersionOfResource(Long myResourceId, AtomicInteger theRemainingCount) {
ResourceTable resource = myResourceTableDao.findById(myResourceId).orElseThrow(IllegalStateException::new);
ResourceHistoryTable currentVersion = myResourceHistoryTableDao.findForIdAndVersion(resource.getId(), resource.getVersion());
if (currentVersion != null) {
expungeHistoricalVersion(currentVersion.getId());
}
ourLog.info("Expunging current version of resource {}", resource.getIdDt().getValue());
deleteAllSearchParams(resource.getResourceId());
resource.getTags().clear();
if (resource.getForcedId() != null) {
ForcedId forcedId = resource.getForcedId();
resource.setForcedId(null);
myResourceTableDao.saveAndFlush(resource);
myIdHelperService.delete(forcedId);
}
myResourceTableDao.delete(resource);
theRemainingCount.decrementAndGet();
}
@Override
@Transactional
public void deleteAllSearchParams(Long theResourceId) {
myResourceIndexedSearchParamUriDao.deleteByResourceId(theResourceId);
myResourceIndexedSearchParamCoordsDao.deleteByResourceId(theResourceId);
myResourceIndexedSearchParamDateDao.deleteByResourceId(theResourceId);
myResourceIndexedSearchParamNumberDao.deleteByResourceId(theResourceId);
myResourceIndexedSearchParamQuantityDao.deleteByResourceId(theResourceId);
myResourceIndexedSearchParamStringDao.deleteByResourceId(theResourceId);
myResourceIndexedSearchParamTokenDao.deleteByResourceId(theResourceId);
myResourceLinkDao.deleteByResourceId(theResourceId);
myResourceTagDao.deleteByResourceId(theResourceId);
}
private void expungeHistoricalVersionsOfId(Long myResourceId, AtomicInteger theRemainingCount) {
ResourceTable resource = myResourceTableDao.findById(myResourceId).orElseThrow(IllegalArgumentException::new);
Pageable page = PageRequest.of(0, theRemainingCount.get());
Slice<Long> versionIds = myResourceHistoryTableDao.findForResourceId(page, resource.getId(), resource.getVersion());
ourLog.debug("Found {} versions of resource {} to expunge", versionIds.getNumberOfElements(), resource.getIdDt().getValue());
for (Long nextVersionId : versionIds) {
expungeHistoricalVersion(nextVersionId);
if (theRemainingCount.decrementAndGet() <= 0) {
return;
}
}
}
@Override
@Transactional
public void deleteByResourceIdPartitions(List<Long> theResourceIds) {
mySearchResultDao.deleteByResourceIds(theResourceIds);
}
private Slice<Long> toSlice(ResourceHistoryTable myVersion) {
Validate.notNull(myVersion);
return new SliceImpl<>(Collections.singletonList(myVersion.getId()));
}
}

View File

@ -26,7 +26,10 @@ import net.ttddyy.dsproxy.proxy.ParameterSetOperation;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
import org.hibernate.engine.jdbc.internal.BasicFormatterImpl;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.trim;
@ -55,7 +58,9 @@ public abstract class BaseCaptureQueriesListener implements ProxyDataSourceBuild
for (QueryInfo next : theQueryInfoList) {
String sql = trim(next.getQuery());
List<String> params;
int size = 0;
if (next.getParametersList().size() > 0 && next.getParametersList().get(0).size() > 0) {
size = next.getParametersList().size();
List<ParameterSetOperation> values = next
.getParametersList()
.get(0);
@ -74,7 +79,7 @@ public abstract class BaseCaptureQueriesListener implements ProxyDataSourceBuild
long elapsedTime = theExecutionInfo.getElapsedTime();
long startTime = System.currentTimeMillis() - elapsedTime;
queryList.add(new Query(sql, params, startTime, elapsedTime, stackTraceElements));
queryList.add(new Query(sql, params, startTime, elapsedTime, stackTraceElements, size));
}
}
@ -87,13 +92,15 @@ public abstract class BaseCaptureQueriesListener implements ProxyDataSourceBuild
private final long myQueryTimestamp;
private final long myElapsedTime;
private final StackTraceElement[] myStackTrace;
private final int mySize;
Query(String theSql, List<String> theParams, long theQueryTimestamp, long theElapsedTime, StackTraceElement[] theStackTraceElements) {
Query(String theSql, List<String> theParams, long theQueryTimestamp, long theElapsedTime, StackTraceElement[] theStackTraceElements, int theSize) {
mySql = theSql;
myParams = Collections.unmodifiableList(theParams);
myQueryTimestamp = theQueryTimestamp;
myElapsedTime = theElapsedTime;
myStackTrace = theStackTraceElements;
mySize = theSize;
}
public long getQueryTimestamp() {
@ -121,7 +128,6 @@ public abstract class BaseCaptureQueriesListener implements ProxyDataSourceBuild
if (theInlineParams) {
List<String> nextParams = new ArrayList<>(myParams);
int idx = 0;
while (nextParams.size() > 0) {
idx = retVal.indexOf("?", idx);
@ -134,6 +140,9 @@ public abstract class BaseCaptureQueriesListener implements ProxyDataSourceBuild
}
}
if (mySize > 1) {
retVal += "\nsize: " + mySize + "\n";
}
return trim(retVal);
}
@ -141,6 +150,10 @@ public abstract class BaseCaptureQueriesListener implements ProxyDataSourceBuild
public StackTraceElement[] getStackTrace() {
return myStackTrace;
}
public int getSize() {
return mySize;
}
}
}

View File

@ -211,6 +211,39 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe
ourLog.info("Insert Queries:\n{}", String.join("\n", queries));
}
/**
* Log all captured INSERT queries
*/
public void logUpdateQueries() {
List<String> queries = getUpdateQueries()
.stream()
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.collect(Collectors.toList());
ourLog.info("Update Queries:\n{}", String.join("\n", queries));
}
/**
* Log all captured DELETE queries
*/
public void logDeleteQueriesForCurrentThread() {
List<String> queries = getDeleteQueriesForCurrentThread()
.stream()
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.collect(Collectors.toList());
ourLog.info("Delete Queries:\n{}", String.join("\n", queries));
}
/**
* Log all captured DELETE queries
*/
public void logDeleteQueries() {
List<String> queries = getDeleteQueries()
.stream()
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.collect(Collectors.toList());
ourLog.info("Delete Queries:\n{}", String.join("\n", queries));
}
public int countSelectQueries() {
return getSelectQueries().size();
}

View File

@ -0,0 +1,170 @@
package ca.uhn.fhir.jpa.dao.expunge;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.jpa.config.TestDstu3Config;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.model.concurrency.PointcutLatch;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.SliceImpl;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.isOneOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {TestDstu3Config.class})
public class PartitionRunnerTest {
private static final Logger ourLog = LoggerFactory.getLogger(PartitionRunnerTest.class);
private static final String EXPUNGE_THREADNAME_1 = "expunge-1";
private static final String EXPUNGE_THREADNAME_2 = "expunge-2";
@Autowired
private PartitionRunner myPartitionRunner;
@Autowired
private DaoConfig myDaoConfig;
private PointcutLatch myLatch = new PointcutLatch("partition call");
@After
public void before() {
myDaoConfig.setExpungeThreadCount(new DaoConfig().getExpungeThreadCount());
myDaoConfig.setExpungeBatchSize(new DaoConfig().getExpungeBatchSize());
myLatch.clear();
}
@Test
public void emptyList() {
Slice<Long> resourceIds = buildSlice(0);
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
myLatch.setExpectedCount(0);
myPartitionRunner.runInPartitionedThreads(resourceIds, partitionConsumer);
myLatch.clear();
}
private Slice<Long> buildSlice(int size) {
List<Long> list = new ArrayList<>();
for (long i = 0; i < size; ++i) {
list.add(i + 1);
}
return new SliceImpl(list);
}
@Test
public void oneItem() throws InterruptedException {
Slice<Long> resourceIds = buildSlice(1);
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
myLatch.setExpectedCount(1);
myPartitionRunner.runInPartitionedThreads(resourceIds, partitionConsumer);
PartitionCall partitionCall = (PartitionCall) PointcutLatch.getLatchInvocationParameter(myLatch.awaitExpected());
assertEquals(EXPUNGE_THREADNAME_1, partitionCall.threadName);
assertEquals(1, partitionCall.size);
}
@Test
public void twoItems() throws InterruptedException {
Slice<Long> resourceIds = buildSlice(2);
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
myLatch.setExpectedCount(1);
myPartitionRunner.runInPartitionedThreads(resourceIds, partitionConsumer);
PartitionCall partitionCall = (PartitionCall) PointcutLatch.getLatchInvocationParameter(myLatch.awaitExpected());
assertEquals(EXPUNGE_THREADNAME_1, partitionCall.threadName);
assertEquals(2, partitionCall.size);
}
@Test
public void tenItemsBatch5() throws InterruptedException {
Slice<Long> resourceIds = buildSlice(10);
myDaoConfig.setExpungeBatchSize(5);
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
myLatch.setExpectedCount(2);
myPartitionRunner.runInPartitionedThreads(resourceIds, partitionConsumer);
List<HookParams> calls = myLatch.awaitExpected();
PartitionCall partitionCall1 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 0);
assertThat(partitionCall1.threadName, isOneOf(EXPUNGE_THREADNAME_1, EXPUNGE_THREADNAME_2));
assertEquals(5, partitionCall1.size);
PartitionCall partitionCall2 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 1);
assertThat(partitionCall2.threadName, isOneOf(EXPUNGE_THREADNAME_1, EXPUNGE_THREADNAME_2));
assertEquals(5, partitionCall2.size);
assertNotEquals(partitionCall1.threadName, partitionCall2.threadName);
}
@Test
public void nineItemsBatch5() throws InterruptedException {
Slice<Long> resourceIds = buildSlice(9);
myDaoConfig.setExpungeBatchSize(5);
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
myLatch.setExpectedCount(2);
myPartitionRunner.runInPartitionedThreads(resourceIds, partitionConsumer);
List<HookParams> calls = myLatch.awaitExpected();
PartitionCall partitionCall1 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 0);
assertThat(partitionCall1.threadName, isOneOf(EXPUNGE_THREADNAME_1, EXPUNGE_THREADNAME_2));
assertEquals(5, partitionCall1.size);
PartitionCall partitionCall2 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 1);
assertThat(partitionCall2.threadName, isOneOf(EXPUNGE_THREADNAME_1, EXPUNGE_THREADNAME_2));
assertEquals(4, partitionCall2.size);
assertNotEquals(partitionCall1.threadName, partitionCall2.threadName);
}
@Test
public void tenItemsOneThread() throws InterruptedException {
Slice<Long> resourceIds = buildSlice(10);
myDaoConfig.setExpungeBatchSize(5);
myDaoConfig.setExpungeThreadCount(1);
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
myLatch.setExpectedCount(2);
myPartitionRunner.runInPartitionedThreads(resourceIds, partitionConsumer);
List<HookParams> calls = myLatch.awaitExpected();
{
PartitionCall partitionCall = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 0);
assertEquals(EXPUNGE_THREADNAME_1, partitionCall.threadName);
assertEquals(5, partitionCall.size);
}
{
PartitionCall partitionCall = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 1);
assertEquals(EXPUNGE_THREADNAME_1, partitionCall.threadName);
assertEquals(5, partitionCall.size);
}
}
private Consumer<List<Long>> buildPartitionConsumer(PointcutLatch latch) {
return list -> latch.call(new PartitionCall(Thread.currentThread().getName(), list.size()));
}
static class PartitionCall {
private final String threadName;
private final int size;
PartitionCall(String theThreadName, int theSize) {
threadName = theThreadName;
size = theSize;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("myThreadName", threadName)
.append("mySize", size)
.toString();
}
}
}

View File

@ -0,0 +1,45 @@
package ca.uhn.fhir.jpa.provider.dstu3;
import ca.uhn.fhir.jpa.util.BaseCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import org.hl7.fhir.dstu3.model.CodeableConcept;
import org.hl7.fhir.dstu3.model.Observation;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
public class ResourceProviderDeleteSqlDstu3Test extends BaseResourceProviderDstu3Test {
private static final Logger ourLog = LoggerFactory.getLogger(ResourceProviderDeleteSqlDstu3Test.class);
@Autowired
protected CircularQueueCaptureQueriesListener myCaptureQueriesListener;
@Test
public void testDeleteFortyTokensWithOneCommand() {
Observation o = new Observation();
o.setStatus(Observation.ObservationStatus.FINAL);
for (int i = 0; i < 40; ++i) {
CodeableConcept code = new CodeableConcept();
code.addCoding().setSystem("foo").setCode("Code" + i);
o.getCategory().add(code);
}
IIdType observationId = myObservationDao.create(o).getId();
myCaptureQueriesListener.clear();
myObservationDao.delete(observationId);
myCaptureQueriesListener.logDeleteQueries();
long deleteCount = myCaptureQueriesListener.getDeleteQueries()
.stream()
.filter(query -> query.getSql(false, false).contains("HFJ_SPIDX_TOKEN"))
.collect(Collectors.summarizingInt(BaseCaptureQueriesListener.Query::getSize))
.getSum();
assertEquals(1, deleteCount);
}
}

View File

@ -1,23 +1,63 @@
package ca.uhn.fhir.jpa.provider.r4;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsInRelativeOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.Matchers.stringContainsInOrder;
import static org.junit.Assert.*;
import ca.uhn.fhir.jpa.config.TestR4Config;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
import ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl;
import ca.uhn.fhir.jpa.util.JpaConstants;
import ca.uhn.fhir.jpa.util.TestUtil;
import ca.uhn.fhir.model.api.TemporalPrecisionEnum;
import ca.uhn.fhir.model.primitive.InstantDt;
import ca.uhn.fhir.model.primitive.UriDt;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.parser.StrictErrorHandler;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.*;
import ca.uhn.fhir.rest.client.api.IClientInterceptor;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.IHttpRequest;
import ca.uhn.fhir.rest.client.api.IHttpResponse;
import ca.uhn.fhir.rest.client.interceptor.CapturingInterceptor;
import ca.uhn.fhir.rest.gclient.StringClientParam;
import ca.uhn.fhir.rest.param.*;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.UrlUtil;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.*;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicNameValuePair;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.*;
import org.hl7.fhir.r4.hapi.validation.FhirInstanceValidator;
import org.hl7.fhir.r4.model.*;
import org.hl7.fhir.r4.model.Bundle.*;
import org.hl7.fhir.r4.model.Encounter.EncounterLocationComponent;
import org.hl7.fhir.r4.model.Encounter.EncounterStatus;
import org.hl7.fhir.r4.model.Enumerations.AdministrativeGender;
import org.hl7.fhir.r4.model.Narrative.NarrativeStatus;
import org.hl7.fhir.r4.model.Observation.ObservationStatus;
import org.hl7.fhir.r4.model.Questionnaire.QuestionnaireItemType;
import org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType;
import org.hl7.fhir.r4.model.Subscription.SubscriptionStatus;
import org.junit.*;
import org.springframework.test.util.AopTestUtils;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.io.BufferedReader;
import java.io.IOException;
@ -27,97 +67,12 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.*;
import java.util.stream.Collectors;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.TestUtil;
import ca.uhn.fhir.rest.api.*;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPatch;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicNameValuePair;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.*;
import org.hl7.fhir.r4.hapi.validation.FhirInstanceValidator;
import org.hl7.fhir.r4.model.*;
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
import org.hl7.fhir.r4.model.Bundle.BundleLinkComponent;
import org.hl7.fhir.r4.model.Bundle.BundleType;
import org.hl7.fhir.r4.model.Bundle.HTTPVerb;
import org.hl7.fhir.r4.model.Bundle.SearchEntryMode;
import org.hl7.fhir.r4.model.Encounter.EncounterLocationComponent;
import org.hl7.fhir.r4.model.Encounter.EncounterStatus;
import org.hl7.fhir.r4.model.Enumerations.AdministrativeGender;
import org.hl7.fhir.r4.model.Narrative.NarrativeStatus;
import org.hl7.fhir.r4.model.Observation.ObservationStatus;
import org.hl7.fhir.r4.model.Questionnaire.QuestionnaireItemType;
import org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType;
import org.hl7.fhir.r4.model.Subscription.SubscriptionStatus;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.test.util.AopTestUtils;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import ca.uhn.fhir.jpa.config.TestR4Config;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl;
import ca.uhn.fhir.jpa.util.JpaConstants;
import ca.uhn.fhir.model.api.TemporalPrecisionEnum;
import ca.uhn.fhir.model.primitive.InstantDt;
import ca.uhn.fhir.model.primitive.UriDt;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.parser.StrictErrorHandler;
import ca.uhn.fhir.rest.client.api.IClientInterceptor;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.IHttpRequest;
import ca.uhn.fhir.rest.client.api.IHttpResponse;
import ca.uhn.fhir.rest.client.interceptor.CapturingInterceptor;
import ca.uhn.fhir.rest.gclient.StringClientParam;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.rest.param.NumberParam;
import ca.uhn.fhir.rest.param.ParamPrefixEnum;
import ca.uhn.fhir.rest.param.StringAndListParam;
import ca.uhn.fhir.rest.param.StringOrListParam;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.UrlUtil;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
@SuppressWarnings("Duplicates")
public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
@ -2035,6 +1990,8 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
}
assertThat(ids, hasItem(id.getIdPart()));
// TODO KHS this fails intermittently with 53 instead of 77
assertEquals(LARGE_NUMBER, ids.size());
for (int i = 1; i < LARGE_NUMBER; i++) {
assertThat(ids.size() + " ids: " + ids, ids, hasItem("A" + StringUtils.leftPad(Integer.toString(i), 2, '0')));

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.module;
package ca.uhn.fhir.jpa.model.concurrency;
import org.hl7.fhir.instance.model.api.IBaseResource;

View File

@ -1,9 +1,10 @@
package ca.uhn.fhir.jpa.subscription.module;
package ca.uhn.fhir.jpa.model.concurrency;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -15,10 +16,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
// TODO KHS copy this version over to hapi-fhir
// This class is primarily used for testing.
public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
private static final Logger ourLog = LoggerFactory.getLogger(PointcutLatch.class);
private static final int DEFAULT_TIMEOUT_SECONDS = 10;
@ -77,7 +75,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
public List<HookParams> awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException {
List<HookParams> retval = myCalledWith.get();
try {
assertNotNull(getName() + " awaitExpected() called before setExpected() called.", myCountdownLatch);
Validate.notNull(myCountdownLatch, getName() + " awaitExpected() called before setExpected() called.");
if (!myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS)) {
throw new AssertionError(getName() + " timed out waiting " + timeoutSecond + " seconds for latch to countdown from " + myInitialCount + " to 0. Is " + myCountdownLatch.getCount() + ".");
}
@ -97,7 +95,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
} finally {
clear();
}
assertEquals("Concurrency error: Latch switched while waiting.", retval, myCalledWith.get());
Validate.isTrue(retval.equals(myCalledWith.get()), "Concurrency error: Latch switched while waiting.");
return retval;
}
@ -173,10 +171,15 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
}
public static Object getLatchInvocationParameter(List<HookParams> theHookParams) {
assertNotNull(theHookParams);
assertEquals("Expected Pointcut to be invoked 1 time", 1, theHookParams.size());
HookParams arg = theHookParams.get(0);
assertEquals("Expected pointcut to be invoked with 1 argument", 1, arg.values().size());
Validate.notNull(theHookParams);
Validate.isTrue(theHookParams.size() == 1, "Expected Pointcut to be invoked 1 time");
return getLatchInvocationParameter(theHookParams, 0);
}
public static Object getLatchInvocationParameter(List<HookParams> theHookParams, int index) {
Validate.notNull(theHookParams);
HookParams arg = theHookParams.get(index);
Validate.isTrue(arg.values().size() == 1, "Expected pointcut to be invoked with 1 argument");
return arg.values().iterator().next();
}
}

View File

@ -4,9 +4,9 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.model.concurrency.IPointcutLatch;
import ca.uhn.fhir.jpa.model.concurrency.PointcutLatch;
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import ca.uhn.fhir.jpa.subscription.module.IPointcutLatch;
import ca.uhn.fhir.jpa.subscription.module.PointcutLatch;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;

View File

@ -1,12 +1,11 @@
package ca.uhn.fhirtest.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
import ca.uhn.fhir.rest.server.interceptor.LoggingInterceptor;
import ca.uhn.fhirtest.interceptor.AnalyticsInterceptor;
import ca.uhn.fhirtest.joke.HolyFooCowInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CommonConfig {

View File

@ -181,6 +181,10 @@
<action type="fix">
Ensure that database cursors are closed immediately after performing a FHIR search.
</action>
<action type="add">
Expunges are now done in batches in multiple threads. Both the number of expunge threads and batch size are configurable
in DaoConfig.
</action>
</release>
<release version="3.7.0" date="2019-02-06" description="Gale">
<action type="add">