Merge branch 'master' of github.com:jamesagnew/hapi-fhir
This commit is contained in:
commit
828df1528f
|
@ -11,9 +11,9 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
|||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -24,6 +24,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.hl7.fhir.instance.model.api.*;
|
||||
|
@ -50,7 +51,7 @@ public class BundleUtil {
|
|||
BaseRuntimeChildDefinition relChild = relDef.getChildByName("relation");
|
||||
List<IBase> relValues = relChild.getAccessor().getValues(nextLink);
|
||||
for (IBase next : relValues) {
|
||||
IPrimitiveType<?> nextValue = (IPrimitiveType<?>)next;
|
||||
IPrimitiveType<?> nextValue = (IPrimitiveType<?>) next;
|
||||
if (theLinkRelation.equals(nextValue.getValueAsString())) {
|
||||
isRightRel = true;
|
||||
}
|
||||
|
@ -64,7 +65,7 @@ public class BundleUtil {
|
|||
BaseRuntimeChildDefinition urlChild = linkDef.getChildByName("url");
|
||||
List<IBase> values = urlChild.getAccessor().getValues(nextLink);
|
||||
for (IBase nextUrl : values) {
|
||||
IPrimitiveType<?> nextValue = (IPrimitiveType<?>)nextUrl;
|
||||
IPrimitiveType<?> nextValue = (IPrimitiveType<?>) nextUrl;
|
||||
if (isNotBlank(nextValue.getValueAsString())) {
|
||||
return nextValue.getValueAsString();
|
||||
}
|
||||
|
@ -83,35 +84,35 @@ public class BundleUtil {
|
|||
|
||||
BaseRuntimeElementCompositeDefinition<?> entryChildElem = (BaseRuntimeElementCompositeDefinition<?>) entryChild.getChildByName("entry");
|
||||
BaseRuntimeChildDefinition resourceChild = entryChildElem.getChildByName("resource");
|
||||
|
||||
|
||||
BaseRuntimeChildDefinition requestChild = entryChildElem.getChildByName("request");
|
||||
BaseRuntimeElementCompositeDefinition<?> requestDef = (BaseRuntimeElementCompositeDefinition<?>) requestChild.getChildByName("request");
|
||||
|
||||
|
||||
BaseRuntimeChildDefinition urlChild = requestDef.getChildByName("url");
|
||||
|
||||
List<Pair<String, IBaseResource>> retVal = new ArrayList<>(entries.size());
|
||||
for (IBase nextEntry : entries) {
|
||||
|
||||
|
||||
String url = null;
|
||||
IBaseResource resource = null;
|
||||
|
||||
|
||||
for (IBase nextEntryValue : requestChild.getAccessor().getValues(nextEntry)) {
|
||||
for (IBase nextUrlValue : urlChild.getAccessor().getValues(nextEntryValue)) {
|
||||
url = ((IPrimitiveType<String>)nextUrlValue).getValue();
|
||||
url = ((IPrimitiveType<String>) nextUrlValue).getValue();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Should return 0..1 only
|
||||
for (IBase nextValue : resourceChild.getAccessor().getValues(nextEntry)) {
|
||||
resource = (IBaseResource) nextValue;
|
||||
}
|
||||
|
||||
|
||||
retVal.add(Pair.of(url, resource));
|
||||
}
|
||||
|
||||
return retVal;
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
|
||||
public static String getBundleType(FhirContext theContext, IBaseBundle theBundle) {
|
||||
RuntimeResourceDefinition def = theContext.getResourceDefinition(theBundle);
|
||||
BaseRuntimeChildDefinition entryChild = def.getChildByName("type");
|
||||
|
@ -147,13 +148,13 @@ public class BundleUtil {
|
|||
List<IBase> entries = entryChild.getAccessor().getValues(theBundle);
|
||||
|
||||
BaseRuntimeElementCompositeDefinition<?> entryChildElem = (BaseRuntimeElementCompositeDefinition<?>) entryChild.getChildByName("entry");
|
||||
|
||||
|
||||
BaseRuntimeChildDefinition resourceChild = entryChildElem.getChildByName("resource");
|
||||
BaseRuntimeChildDefinition requestChild = entryChildElem.getChildByName("request");
|
||||
BaseRuntimeElementCompositeDefinition<?> requestElem = (BaseRuntimeElementCompositeDefinition<?>) requestChild.getChildByName("request");
|
||||
BaseRuntimeElementCompositeDefinition<?> requestElem = (BaseRuntimeElementCompositeDefinition<?>) requestChild.getChildByName("request");
|
||||
BaseRuntimeChildDefinition urlChild = requestElem.getChildByName("url");
|
||||
BaseRuntimeChildDefinition methodChild = requestElem.getChildByName("method");
|
||||
|
||||
|
||||
for (IBase nextEntry : entries) {
|
||||
IBaseResource resource = null;
|
||||
String url = null;
|
||||
|
@ -164,39 +165,40 @@ public class BundleUtil {
|
|||
}
|
||||
for (IBase nextRequest : requestChild.getAccessor().getValues(nextEntry)) {
|
||||
for (IBase nextUrl : urlChild.getAccessor().getValues(nextRequest)) {
|
||||
url = ((IPrimitiveType<?>)nextUrl).getValueAsString();
|
||||
url = ((IPrimitiveType<?>) nextUrl).getValueAsString();
|
||||
}
|
||||
for (IBase nextUrl : methodChild.getAccessor().getValues(nextRequest)) {
|
||||
String methodString = ((IPrimitiveType<?>)nextUrl).getValueAsString();
|
||||
String methodString = ((IPrimitiveType<?>) nextUrl).getValueAsString();
|
||||
if (isNotBlank(methodString)) {
|
||||
requestType = RequestTypeEnum.valueOf(methodString);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
/*
|
||||
* All 3 might be null - That's ok because we still want to know the
|
||||
* order in the original bundle.
|
||||
*/
|
||||
retVal.add(new BundleEntryParts(requestType, url, resource));
|
||||
}
|
||||
|
||||
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Extract all of the resources from a given bundle
|
||||
*/
|
||||
public static List<IBaseResource> toListOfResources(FhirContext theContext, IBaseBundle theBundle) {
|
||||
return toListOfResourcesOfType(theContext, theBundle, null);
|
||||
return toListOfResourcesOfType(theContext, theBundle, IBaseResource.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract all of the resources of a given type from a given bundle
|
||||
* Extract all of the resources of a given type from a given bundle
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T extends IBaseResource> List<T> toListOfResourcesOfType(FhirContext theContext, IBaseBundle theBundle, Class<T> theTypeToInclude) {
|
||||
Objects.requireNonNull(theTypeToInclude, "ResourceType must not be null");
|
||||
List<T> retVal = new ArrayList<>();
|
||||
|
||||
RuntimeResourceDefinition def = theContext.getResourceDefinition(theBundle);
|
||||
|
@ -207,36 +209,36 @@ public class BundleUtil {
|
|||
BaseRuntimeChildDefinition resourceChild = entryChildElem.getChildByName("resource");
|
||||
for (IBase nextEntry : entries) {
|
||||
for (IBase next : resourceChild.getAccessor().getValues(nextEntry)) {
|
||||
if (theTypeToInclude != null && !theTypeToInclude.isAssignableFrom(next.getClass())) {
|
||||
continue;
|
||||
if (theTypeToInclude.isAssignableFrom(next.getClass())) {
|
||||
retVal.add((T) next);
|
||||
}
|
||||
retVal.add((T) next);
|
||||
}
|
||||
}
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
public static class BundleEntryParts
|
||||
{
|
||||
public static class BundleEntryParts {
|
||||
private final RequestTypeEnum myRequestType;
|
||||
private final IBaseResource myResource;
|
||||
private final String myUrl;
|
||||
|
||||
BundleEntryParts(RequestTypeEnum theRequestType, String theUrl, IBaseResource theResource) {
|
||||
super();
|
||||
myRequestType = theRequestType;
|
||||
myUrl = theUrl;
|
||||
myResource = theResource;
|
||||
}
|
||||
|
||||
public RequestTypeEnum getRequestType() {
|
||||
return myRequestType;
|
||||
}
|
||||
|
||||
public IBaseResource getResource() {
|
||||
return myResource;
|
||||
}
|
||||
|
||||
public String getUrl() {
|
||||
return myUrl;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -5,10 +5,13 @@ import ca.uhn.fhir.interceptor.api.HookParams;
|
|||
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
|
||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||
import ca.uhn.fhir.jpa.dao.data.*;
|
||||
import ca.uhn.fhir.jpa.dao.expunge.ExpungeService;
|
||||
import ca.uhn.fhir.jpa.dao.index.DaoSearchParamSynchronizer;
|
||||
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
|
||||
import ca.uhn.fhir.jpa.dao.index.SearchParamWithInlineReferencesExtractor;
|
||||
import ca.uhn.fhir.jpa.entity.*;
|
||||
import ca.uhn.fhir.jpa.entity.ResourceSearchView;
|
||||
import ca.uhn.fhir.jpa.entity.Search;
|
||||
import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
|
||||
import ca.uhn.fhir.jpa.model.entity.*;
|
||||
import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
|
||||
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
|
||||
|
@ -20,8 +23,6 @@ import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
|
|||
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
|
||||
import ca.uhn.fhir.jpa.term.IHapiTerminologySvc;
|
||||
import ca.uhn.fhir.jpa.util.DeleteConflict;
|
||||
import ca.uhn.fhir.jpa.util.ExpungeOptions;
|
||||
import ca.uhn.fhir.jpa.util.ExpungeOutcome;
|
||||
import ca.uhn.fhir.jpa.util.JpaConstants;
|
||||
import ca.uhn.fhir.model.api.IResource;
|
||||
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
|
||||
|
@ -41,10 +42,11 @@ import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
|
|||
import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum;
|
||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.server.exceptions.*;
|
||||
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
||||
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails;
|
||||
import ca.uhn.fhir.rest.server.interceptor.IServerOperationInterceptor;
|
||||
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
|
||||
import ca.uhn.fhir.util.CoverageIgnore;
|
||||
import ca.uhn.fhir.util.OperationOutcomeUtil;
|
||||
|
@ -52,7 +54,6 @@ import ca.uhn.fhir.util.StopWatch;
|
|||
import ca.uhn.fhir.util.XmlUtil;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hashing;
|
||||
|
@ -62,20 +63,16 @@ import org.hibernate.Session;
|
|||
import org.hibernate.internal.SessionImpl;
|
||||
import org.hl7.fhir.instance.model.api.*;
|
||||
import org.hl7.fhir.r4.model.Bundle.HTTPVerb;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.data.domain.PageRequest;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.data.domain.Slice;
|
||||
import org.springframework.data.domain.SliceImpl;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import javax.persistence.*;
|
||||
import javax.persistence.criteria.CriteriaBuilder;
|
||||
|
@ -86,7 +83,6 @@ import javax.xml.stream.events.Characters;
|
|||
import javax.xml.stream.events.XMLEvent;
|
||||
import java.util.*;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.*;
|
||||
|
||||
|
@ -120,7 +116,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
|
|||
public static final String OO_SEVERITY_ERROR = "error";
|
||||
public static final String OO_SEVERITY_INFO = "information";
|
||||
public static final String OO_SEVERITY_WARN = "warning";
|
||||
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseHapiFhirDao.class);
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(BaseHapiFhirDao.class);
|
||||
private static final Map<FhirVersionEnum, FhirContext> ourRetrievalContexts = new HashMap<>();
|
||||
private static final String PROCESSING_SUB_REQUEST = "BaseHapiFhirDao.processingSubRequest";
|
||||
private static boolean ourValidationDisabledForUnitTest;
|
||||
|
@ -133,54 +129,30 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
|
|||
@Autowired
|
||||
protected IForcedIdDao myForcedIdDao;
|
||||
@Autowired
|
||||
protected ISearchResultDao mySearchResultDao;
|
||||
@Autowired(required = false)
|
||||
protected IFulltextSearchSvc myFulltextSearchSvc;
|
||||
@Autowired()
|
||||
protected IResourceIndexedSearchParamUriDao myResourceIndexedSearchParamUriDao;
|
||||
@Autowired()
|
||||
protected IResourceIndexedSearchParamStringDao myResourceIndexedSearchParamStringDao;
|
||||
@Autowired()
|
||||
protected IResourceIndexedSearchParamTokenDao myResourceIndexedSearchParamTokenDao;
|
||||
@Autowired
|
||||
protected IResourceLinkDao myResourceLinkDao;
|
||||
@Autowired()
|
||||
protected IResourceIndexedSearchParamDateDao myResourceIndexedSearchParamDateDao;
|
||||
@Autowired()
|
||||
protected IResourceIndexedSearchParamQuantityDao myResourceIndexedSearchParamQuantityDao;
|
||||
@Autowired()
|
||||
protected IResourceIndexedSearchParamCoordsDao myResourceIndexedSearchParamCoordsDao;
|
||||
@Autowired()
|
||||
protected IResourceIndexedSearchParamNumberDao myResourceIndexedSearchParamNumberDao;
|
||||
@Autowired
|
||||
protected ISearchCoordinatorSvc mySearchCoordinatorSvc;
|
||||
@Autowired
|
||||
protected ISearchParamRegistry mySerarchParamRegistry;
|
||||
@Autowired()
|
||||
@Autowired
|
||||
protected IHapiTerminologySvc myTerminologySvc;
|
||||
@Autowired
|
||||
protected IResourceHistoryTableDao myResourceHistoryTableDao;
|
||||
@Autowired
|
||||
protected IResourceHistoryTagDao myResourceHistoryTagDao;
|
||||
@Autowired
|
||||
protected IResourceTableDao myResourceTableDao;
|
||||
@Autowired
|
||||
protected IResourceTagDao myResourceTagDao;
|
||||
@Autowired
|
||||
protected IResourceSearchViewDao myResourceViewDao;
|
||||
@Autowired
|
||||
protected ISearchParamRegistry mySearchParamRegistry;
|
||||
@Autowired(required = true)
|
||||
@Autowired
|
||||
private DaoConfig myConfig;
|
||||
private FhirContext myContext;
|
||||
|
||||
@Autowired
|
||||
private PlatformTransactionManager myPlatformTransactionManager;
|
||||
@Autowired
|
||||
private ISearchDao mySearchDao;
|
||||
@Autowired
|
||||
private ISearchParamPresenceSvc mySearchParamPresenceSvc;
|
||||
//@Autowired
|
||||
//private ISearchResultDao mySearchResultDao;
|
||||
@Autowired
|
||||
private DaoRegistry myDaoRegistry;
|
||||
@Autowired
|
||||
|
@ -189,11 +161,31 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
|
|||
private DaoSearchParamSynchronizer myDaoSearchParamSynchronizer;
|
||||
@Autowired
|
||||
private SearchBuilderFactory mySearchBuilderFactory;
|
||||
@Autowired
|
||||
ExpungeService myExpungeService;
|
||||
|
||||
private FhirContext myContext;
|
||||
private ApplicationContext myApplicationContext;
|
||||
@Autowired
|
||||
protected IInterceptorBroadcaster myInterceptorBroadcaster;
|
||||
|
||||
@Autowired
|
||||
public void setContext(FhirContext theContext) {
|
||||
myContext = theContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext theApplicationContext) throws BeansException {
|
||||
/*
|
||||
* We do a null check here because Smile's module system tries to
|
||||
* initialize the application context twice if two modules depend on
|
||||
* the persistence module. The second time sets the dependency's appctx.
|
||||
*/
|
||||
if (myApplicationContext == null) {
|
||||
myApplicationContext = theApplicationContext;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the newly created forced ID. If the entity already had a forced ID, or if
|
||||
* none was created, returns null.
|
||||
|
@ -215,261 +207,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
|
|||
return null;
|
||||
}
|
||||
|
||||
protected ExpungeOutcome doExpunge(String theResourceName, Long theResourceId, Long theVersion, ExpungeOptions theExpungeOptions) {
|
||||
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
|
||||
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
|
||||
ourLog.info("Expunge: ResourceName[{}] Id[{}] Version[{}] Options[{}]", theResourceName, theResourceId, theVersion, theExpungeOptions);
|
||||
|
||||
if (!getConfig().isExpungeEnabled()) {
|
||||
throw new MethodNotAllowedException("$expunge is not enabled on this server");
|
||||
}
|
||||
|
||||
if (theExpungeOptions.getLimit() < 1) {
|
||||
throw new InvalidRequestException("Expunge limit may not be less than 1. Received expunge limit " + theExpungeOptions.getLimit() + ".");
|
||||
}
|
||||
|
||||
AtomicInteger remainingCount = new AtomicInteger(theExpungeOptions.getLimit());
|
||||
|
||||
if (theResourceName == null && theResourceId == null && theVersion == null) {
|
||||
if (theExpungeOptions.isExpungeEverything()) {
|
||||
doExpungeEverything();
|
||||
}
|
||||
}
|
||||
|
||||
if (theExpungeOptions.isExpungeDeletedResources() && theVersion == null) {
|
||||
|
||||
/*
|
||||
* Delete historical versions of deleted resources
|
||||
*/
|
||||
Pageable page = PageRequest.of(0, remainingCount.get());
|
||||
Slice<Long> resourceIds = txTemplate.execute(t -> {
|
||||
if (theResourceId != null) {
|
||||
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResourcesOfType(page, theResourceId, theResourceName);
|
||||
ourLog.info("Expunging {} deleted resources of type[{}] and ID[{}]", ids.getNumberOfElements(), theResourceName, theResourceId);
|
||||
return ids;
|
||||
} else {
|
||||
if (theResourceName != null) {
|
||||
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResourcesOfType(page, theResourceName);
|
||||
ourLog.info("Expunging {} deleted resources of type[{}]", ids.getNumberOfElements(), theResourceName);
|
||||
return ids;
|
||||
} else {
|
||||
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResources(page);
|
||||
ourLog.info("Expunging {} deleted resources (all types)", ids.getNumberOfElements(), theResourceName);
|
||||
return ids;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
/*
|
||||
* Delete any search result cache entries pointing to the given resource. We do
|
||||
* this in batches to avoid sending giant batches of parameters to the DB
|
||||
*/
|
||||
List<List<Long>> partitions = Lists.partition(resourceIds.getContent(), 800);
|
||||
for (List<Long> nextPartition : partitions) {
|
||||
ourLog.info("Expunging any search results pointing to {} resources", nextPartition.size());
|
||||
txTemplate.execute(t -> {
|
||||
mySearchResultDao.deleteByResourceIds(nextPartition);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Delete historical versions
|
||||
*/
|
||||
for (Long next : resourceIds) {
|
||||
txTemplate.execute(t -> {
|
||||
expungeHistoricalVersionsOfId(next, remainingCount);
|
||||
return null;
|
||||
});
|
||||
if (remainingCount.get() <= 0) {
|
||||
ourLog.debug("Expunge limit has been hit - Stopping operation");
|
||||
return toExpungeOutcome(theExpungeOptions, remainingCount);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Delete current versions of deleted resources
|
||||
*/
|
||||
for (Long next : resourceIds) {
|
||||
txTemplate.execute(t -> {
|
||||
expungeCurrentVersionOfResource(next, remainingCount);
|
||||
return null;
|
||||
});
|
||||
if (remainingCount.get() <= 0) {
|
||||
ourLog.debug("Expunge limit has been hit - Stopping operation");
|
||||
return toExpungeOutcome(theExpungeOptions, remainingCount);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (theExpungeOptions.isExpungeOldVersions()) {
|
||||
|
||||
/*
|
||||
* Delete historical versions of non-deleted resources
|
||||
*/
|
||||
Pageable page = PageRequest.of(0, remainingCount.get());
|
||||
Slice<Long> historicalIds = txTemplate.execute(t -> {
|
||||
if (theResourceId != null) {
|
||||
if (theVersion != null) {
|
||||
return toSlice(myResourceHistoryTableDao.findForIdAndVersion(theResourceId, theVersion));
|
||||
} else {
|
||||
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResourceId(page, theResourceId);
|
||||
}
|
||||
} else {
|
||||
if (theResourceName != null) {
|
||||
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResources(page, theResourceName);
|
||||
} else {
|
||||
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResources(page);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (Long next : historicalIds) {
|
||||
txTemplate.execute(t -> {
|
||||
expungeHistoricalVersion(next);
|
||||
if (remainingCount.decrementAndGet() <= 0) {
|
||||
return toExpungeOutcome(theExpungeOptions, remainingCount);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
return toExpungeOutcome(theExpungeOptions, remainingCount);
|
||||
}
|
||||
|
||||
private void doExpungeEverything() {
|
||||
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
ourLog.info("BEGINNING GLOBAL $expunge");
|
||||
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
|
||||
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
|
||||
txTemplate.execute(t -> {
|
||||
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + ResourceHistoryTable.class.getSimpleName() + " d SET d.myForcedId = null"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + ResourceTable.class.getSimpleName() + " d SET d.myForcedId = null"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + TermCodeSystem.class.getSimpleName() + " d SET d.myCurrentVersion = null"));
|
||||
return null;
|
||||
});
|
||||
txTemplate.execute(t -> {
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchParamPresent.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ForcedId.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamDate.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamNumber.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamQuantity.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamString.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamToken.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamUri.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamCoords.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedCompositeStringUnique.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceLink.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchResult.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchInclude.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptParentChildLink.class.getSimpleName() + " d"));
|
||||
return null;
|
||||
});
|
||||
txTemplate.execute(t -> {
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroupElementTarget.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroupElement.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroup.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMap.class.getSimpleName() + " d"));
|
||||
return null;
|
||||
});
|
||||
txTemplate.execute(t -> {
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptProperty.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptDesignation.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConcept.class.getSimpleName() + " d"));
|
||||
for (TermCodeSystem next : myEntityManager.createQuery("SELECT c FROM " + TermCodeSystem.class.getName() + " c", TermCodeSystem.class).getResultList()) {
|
||||
next.setCurrentVersion(null);
|
||||
myEntityManager.merge(next);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
txTemplate.execute(t -> {
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermCodeSystemVersion.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermCodeSystem.class.getSimpleName() + " d"));
|
||||
return null;
|
||||
});
|
||||
txTemplate.execute(t -> {
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SubscriptionTable.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceHistoryTag.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceTag.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TagDefinition.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceHistoryTable.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceTable.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + org.hibernate.search.jpa.Search.class.getSimpleName() + " d"));
|
||||
return null;
|
||||
});
|
||||
|
||||
ourLog.info("COMPLETED GLOBAL $expunge - Deleted {} rows", counter.get());
|
||||
}
|
||||
|
||||
private int doExpungeEverythingQuery(String theQuery) {
|
||||
StopWatch sw = new StopWatch();
|
||||
int outcome = myEntityManager.createQuery(theQuery).executeUpdate();
|
||||
ourLog.debug("Query affected {} rows in {}: {}", outcome, sw.toString(), theQuery);
|
||||
return outcome;
|
||||
}
|
||||
|
||||
private void expungeCurrentVersionOfResource(Long theResourceId, AtomicInteger theRemainingCount) {
|
||||
ResourceTable resource = myResourceTableDao.findById(theResourceId).orElseThrow(IllegalStateException::new);
|
||||
|
||||
ResourceHistoryTable currentVersion = myResourceHistoryTableDao.findForIdAndVersion(resource.getId(), resource.getVersion());
|
||||
if (currentVersion != null) {
|
||||
expungeHistoricalVersion(currentVersion.getId());
|
||||
}
|
||||
|
||||
ourLog.info("Expunging current version of resource {}", resource.getIdDt().getValue());
|
||||
|
||||
myResourceIndexedSearchParamUriDao.deleteAll(resource.getParamsUri());
|
||||
myResourceIndexedSearchParamCoordsDao.deleteAll(resource.getParamsCoords());
|
||||
myResourceIndexedSearchParamDateDao.deleteAll(resource.getParamsDate());
|
||||
myResourceIndexedSearchParamNumberDao.deleteAll(resource.getParamsNumber());
|
||||
myResourceIndexedSearchParamQuantityDao.deleteAll(resource.getParamsQuantity());
|
||||
myResourceIndexedSearchParamStringDao.deleteAll(resource.getParamsString());
|
||||
myResourceIndexedSearchParamTokenDao.deleteAll(resource.getParamsToken());
|
||||
myResourceLinkDao.deleteAll(resource.getResourceLinks());
|
||||
myResourceLinkDao.deleteAll(resource.getResourceLinksAsTarget());
|
||||
|
||||
myResourceTagDao.deleteAll(resource.getTags());
|
||||
resource.getTags().clear();
|
||||
|
||||
if (resource.getForcedId() != null) {
|
||||
ForcedId forcedId = resource.getForcedId();
|
||||
resource.setForcedId(null);
|
||||
myResourceTableDao.saveAndFlush(resource);
|
||||
myIdHelperService.delete(forcedId);
|
||||
}
|
||||
|
||||
myResourceTableDao.delete(resource);
|
||||
|
||||
theRemainingCount.decrementAndGet();
|
||||
}
|
||||
|
||||
protected void expungeHistoricalVersion(Long theNextVersionId) {
|
||||
ResourceHistoryTable version = myResourceHistoryTableDao.findById(theNextVersionId).orElseThrow(IllegalArgumentException::new);
|
||||
ourLog.info("Deleting resource version {}", version.getIdDt().getValue());
|
||||
|
||||
myResourceHistoryTagDao.deleteAll(version.getTags());
|
||||
myResourceHistoryTableDao.delete(version);
|
||||
}
|
||||
|
||||
protected void expungeHistoricalVersionsOfId(Long theResourceId, AtomicInteger theRemainingCount) {
|
||||
ResourceTable resource = myResourceTableDao.findById(theResourceId).orElseThrow(IllegalArgumentException::new);
|
||||
|
||||
Pageable page = PageRequest.of(0, theRemainingCount.get());
|
||||
|
||||
Slice<Long> versionIds = myResourceHistoryTableDao.findForResourceId(page, resource.getId(), resource.getVersion());
|
||||
ourLog.debug("Found {} versions of resource {} to expunge", versionIds.getNumberOfElements(), resource.getIdDt().getValue());
|
||||
for (Long nextVersionId : versionIds) {
|
||||
expungeHistoricalVersion(nextVersionId);
|
||||
if (theRemainingCount.decrementAndGet() <= 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void extractTagsHapi(IResource theResource, ResourceTable theEntity, Set<ResourceTag> allDefs) {
|
||||
TagList tagList = ResourceMetadataKeyEnum.TAG_LIST.get(theResource);
|
||||
if (tagList != null) {
|
||||
|
@ -602,11 +339,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
|
|||
return myContext;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setContext(FhirContext theContext) {
|
||||
myContext = theContext;
|
||||
}
|
||||
|
||||
public FhirContext getContext(FhirVersionEnum theVersion) {
|
||||
Validate.notNull(theVersion, "theVersion must not be null");
|
||||
synchronized (ourRetrievalContexts) {
|
||||
|
@ -1055,18 +787,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
|
|||
throw new NotImplementedException("");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext theApplicationContext) throws BeansException {
|
||||
/*
|
||||
* We do a null check here because Smile's module system tries to
|
||||
* initialize the application context twice if two modules depend on
|
||||
* the persistence module. The second time sets the dependency's appctx.
|
||||
*/
|
||||
if (myApplicationContext == null) {
|
||||
myApplicationContext = theApplicationContext;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is called when an update to an existing resource detects that the resource supplied for update is missing a tag/profile/security label that the currently persisted resource holds.
|
||||
* <p>
|
||||
|
@ -1117,11 +837,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
|
|||
return false;
|
||||
}
|
||||
|
||||
private ExpungeOutcome toExpungeOutcome(ExpungeOptions theExpungeOptions, AtomicInteger theRemainingCount) {
|
||||
return new ExpungeOutcome()
|
||||
.setDeletedCount(theExpungeOptions.getLimit() - theRemainingCount.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public IBaseResource toResource(BaseHasResource theEntity, boolean theForHistoryOperation) {
|
||||
RuntimeResourceDefinition type = myContext.getResourceDefinition(theEntity.getResourceType());
|
||||
|
@ -1248,9 +963,9 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
|
|||
return myContext.getResourceDefinition(theResource).getName();
|
||||
}
|
||||
|
||||
private Slice<Long> toSlice(ResourceHistoryTable theVersion) {
|
||||
Validate.notNull(theVersion);
|
||||
return new SliceImpl<>(Collections.singletonList(theVersion.getId()));
|
||||
protected ResourceTable updateEntityForDelete(RequestDetails theRequest, ResourceTable entity) {
|
||||
Date updateTime = new Date();
|
||||
return updateEntity(theRequest, null, entity, updateTime, true, true, updateTime, false, true);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -1284,14 +999,13 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
|
|||
theEntity.setPublished(theUpdateTime);
|
||||
}
|
||||
|
||||
ResourceIndexedSearchParams existingParams = new ResourceIndexedSearchParams(theEntity);
|
||||
ResourceIndexedSearchParams existingParams = null;
|
||||
|
||||
ResourceIndexedSearchParams newParams = null;
|
||||
|
||||
EncodedResource changed;
|
||||
if (theDeletedTimestampOrNull != null) {
|
||||
|
||||
newParams = new ResourceIndexedSearchParams();
|
||||
// DELETE
|
||||
|
||||
theEntity.setDeleted(theDeletedTimestampOrNull);
|
||||
theEntity.setUpdated(theDeletedTimestampOrNull);
|
||||
|
@ -1302,7 +1016,8 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
|
|||
changed = populateResourceIntoEntity(theRequest, theResource, theEntity, true);
|
||||
|
||||
} else {
|
||||
|
||||
// CREATE or UPDATE
|
||||
existingParams = new ResourceIndexedSearchParams(theEntity);
|
||||
theEntity.setDeleted(null);
|
||||
|
||||
if (thePerformIndexing) {
|
||||
|
@ -1392,7 +1107,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
|
|||
* index table for resource links (reference indexes) because we index
|
||||
* those by path and not by parameter name.
|
||||
*/
|
||||
if (thePerformIndexing) {
|
||||
if (thePerformIndexing && newParams != null) {
|
||||
Map<String, Boolean> presentSearchParams = new HashMap<>();
|
||||
for (String nextKey : newParams.getPopulatedResourceLinkParameters()) {
|
||||
presentSearchParams.put(nextKey, Boolean.TRUE);
|
||||
|
@ -1412,8 +1127,12 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
|
|||
* Indexing
|
||||
*/
|
||||
if (thePerformIndexing) {
|
||||
myDaoSearchParamSynchronizer.synchronizeSearchParamsToDatabase(newParams, theEntity, existingParams);
|
||||
mySearchParamWithInlineReferencesExtractor.storeCompositeStringUniques(newParams, theEntity, existingParams);
|
||||
if (newParams == null) {
|
||||
myExpungeService.deleteAllSearchParams(theEntity.getId());
|
||||
} else {
|
||||
myDaoSearchParamSynchronizer.synchronizeSearchParamsToDatabase(newParams, theEntity, existingParams);
|
||||
mySearchParamWithInlineReferencesExtractor.storeCompositeStringUniques(newParams, theEntity, existingParams);
|
||||
}
|
||||
}
|
||||
|
||||
if (theResource != null) {
|
||||
|
@ -1424,11 +1143,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
|
|||
return theEntity;
|
||||
}
|
||||
|
||||
protected ResourceTable updateEntity(RequestDetails theRequest, IBaseResource theResource, ResourceTable
|
||||
entity, Date theDeletedTimestampOrNull, Date theUpdateTime) {
|
||||
return updateEntity(theRequest, theResource, entity, theDeletedTimestampOrNull, true, true, theUpdateTime, false, true);
|
||||
}
|
||||
|
||||
public ResourceTable updateInternal(RequestDetails theRequestDetails, T theResource, boolean thePerformIndexing, boolean theForceUpdateVersion,
|
||||
ResourceTable theEntity, IIdType theResourceId, IBaseResource theOldResource) {
|
||||
|
||||
|
|
|
@ -90,12 +90,12 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
|||
protected DaoConfig myDaoConfig;
|
||||
@Autowired
|
||||
private MatchResourceUrlService myMatchResourceUrlService;
|
||||
@Autowired
|
||||
private IResourceReindexingSvc myResourceReindexingSvc;
|
||||
|
||||
private String myResourceName;
|
||||
private Class<T> myResourceType;
|
||||
private String mySecondaryPrimaryKeyParamName;
|
||||
@Autowired
|
||||
private IResourceReindexingSvc myResourceReindexingSvc;
|
||||
|
||||
@Override
|
||||
public void addTag(IIdType theId, TagTypeEnum theTagType, String theScheme, String theTerm, String theLabel) {
|
||||
|
@ -236,8 +236,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
|||
notifyInterceptors(RestOperationTypeEnum.DELETE, requestDetails);
|
||||
}
|
||||
|
||||
Date updateTime = new Date();
|
||||
ResourceTable savedEntity = updateEntity(theRequest, null, entity, updateTime, updateTime);
|
||||
ResourceTable savedEntity = updateEntityForDelete(theRequest, entity);
|
||||
resourceToDelete.setId(entity.getIdDt());
|
||||
|
||||
// Notify JPA interceptors
|
||||
|
@ -285,15 +284,15 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
|||
public DeleteMethodOutcome deleteByUrl(String theUrl, List<DeleteConflict> deleteConflicts, RequestDetails theRequest) {
|
||||
StopWatch w = new StopWatch();
|
||||
|
||||
Set<Long> resource = myMatchResourceUrlService.processMatchUrl(theUrl, myResourceType);
|
||||
if (resource.size() > 1) {
|
||||
Set<Long> resourceIds = myMatchResourceUrlService.processMatchUrl(theUrl, myResourceType);
|
||||
if (resourceIds.size() > 1) {
|
||||
if (myDaoConfig.isAllowMultipleDelete() == false) {
|
||||
throw new PreconditionFailedException(getContext().getLocalizer().getMessage(BaseHapiFhirDao.class, "transactionOperationWithMultipleMatchFailure", "DELETE", theUrl, resource.size()));
|
||||
throw new PreconditionFailedException(getContext().getLocalizer().getMessage(BaseHapiFhirDao.class, "transactionOperationWithMultipleMatchFailure", "DELETE", theUrl, resourceIds.size()));
|
||||
}
|
||||
}
|
||||
|
||||
List<ResourceTable> deletedResources = new ArrayList<ResourceTable>();
|
||||
for (Long pid : resource) {
|
||||
List<ResourceTable> deletedResources = new ArrayList<>();
|
||||
for (Long pid : resourceIds) {
|
||||
ResourceTable entity = myEntityManager.find(ResourceTable.class, pid);
|
||||
deletedResources.add(entity);
|
||||
|
||||
|
@ -316,8 +315,8 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
|||
}
|
||||
|
||||
// Perform delete
|
||||
Date updateTime = new Date();
|
||||
updateEntity(theRequest, null, entity, updateTime, updateTime);
|
||||
|
||||
updateEntityForDelete(theRequest, entity);
|
||||
resourceToDelete.setId(entity.getIdDt());
|
||||
|
||||
// Notify JPA interceptors
|
||||
|
@ -558,10 +557,10 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
|||
throw new PreconditionFailedException("Can not perform version-specific expunge of resource " + theId.toUnqualified().getValue() + " as this is the current version");
|
||||
}
|
||||
|
||||
return doExpunge(getResourceName(), entity.getResourceId(), entity.getVersion(), theExpungeOptions);
|
||||
return myExpungeService.expunge(getResourceName(), entity.getResourceId(), entity.getVersion(), theExpungeOptions);
|
||||
}
|
||||
|
||||
return doExpunge(getResourceName(), entity.getResourceId(), null, theExpungeOptions);
|
||||
return myExpungeService.expunge(getResourceName(), entity.getResourceId(), null, theExpungeOptions);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -569,7 +568,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
|||
public ExpungeOutcome expunge(ExpungeOptions theExpungeOptions) {
|
||||
ourLog.info("Beginning TYPE[{}] expunge operation", getResourceName());
|
||||
|
||||
return doExpunge(getResourceName(), null, null, theExpungeOptions);
|
||||
return myExpungeService.expunge(getResourceName(), null, null, theExpungeOptions);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
package ca.uhn.fhir.jpa.dao;
|
||||
|
||||
import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
|
||||
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
|
||||
import ca.uhn.fhir.jpa.util.ExpungeOptions;
|
||||
import ca.uhn.fhir.jpa.util.ExpungeOutcome;
|
||||
import ca.uhn.fhir.jpa.util.ResourceCountCache;
|
||||
|
@ -13,7 +11,6 @@ import ca.uhn.fhir.util.StopWatch;
|
|||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
|
@ -22,7 +19,6 @@ import java.util.Date;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/*
|
||||
* #%L
|
||||
|
@ -54,7 +50,7 @@ public abstract class BaseHapiFhirSystemDao<T, MT> extends BaseHapiFhirDao<IBase
|
|||
@Override
|
||||
@Transactional(propagation = Propagation.NEVER)
|
||||
public ExpungeOutcome expunge(ExpungeOptions theExpungeOptions) {
|
||||
return doExpunge(null, null, null, theExpungeOptions);
|
||||
return myExpungeService.expunge(null, null, null, theExpungeOptions);
|
||||
}
|
||||
|
||||
@Transactional(propagation = Propagation.REQUIRED)
|
||||
|
|
|
@ -73,6 +73,7 @@ public class DaoConfig {
|
|||
Bundle.BundleType.MESSAGE.toCode()
|
||||
)));
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(DaoConfig.class);
|
||||
private static final int DEFAULT_EXPUNGE_BATCH_SIZE = 800;
|
||||
private IndexEnabledEnum myIndexMissingFieldsEnabled = IndexEnabledEnum.DISABLED;
|
||||
|
||||
/**
|
||||
|
@ -134,7 +135,9 @@ public class DaoConfig {
|
|||
private IdStrategyEnum myResourceServerIdStrategy = IdStrategyEnum.SEQUENTIAL_NUMERIC;
|
||||
private boolean myMarkResourcesForReindexingUponSearchParameterChange;
|
||||
private boolean myExpungeEnabled;
|
||||
private int myExpungeBatchSize = DEFAULT_EXPUNGE_BATCH_SIZE;
|
||||
private int myReindexThreadCount;
|
||||
private int myExpungeThreadCount;
|
||||
private Set<String> myBundleTypesAllowedForStorage;
|
||||
private boolean myValidateSearchParameterExpressionsOnSave = true;
|
||||
private List<Integer> mySearchPreFetchThresholds = Arrays.asList(500, 2000, -1);
|
||||
|
@ -153,6 +156,7 @@ public class DaoConfig {
|
|||
setSubscriptionPurgeInactiveAfterMillis(Long.MAX_VALUE);
|
||||
setMarkResourcesForReindexingUponSearchParameterChange(true);
|
||||
setReindexThreadCount(Runtime.getRuntime().availableProcessors());
|
||||
setExpungeThreadCount(Runtime.getRuntime().availableProcessors());
|
||||
setBundleTypesAllowedForStorage(DEFAULT_BUNDLE_TYPES_ALLOWED_FOR_STORAGE);
|
||||
|
||||
if ("true".equalsIgnoreCase(System.getProperty(DISABLE_STATUS_BASED_REINDEX))) {
|
||||
|
@ -601,6 +605,31 @@ public class DaoConfig {
|
|||
myReindexThreadCount = Math.max(myReindexThreadCount, 1); // Minimum of 1
|
||||
}
|
||||
|
||||
/**
|
||||
* This setting controls the number of threads allocated to the expunge operation
|
||||
* <p>
|
||||
* The default value is set to the number of available processors
|
||||
* (via <code>Runtime.getRuntime().availableProcessors()</code>). Value
|
||||
* for this setting must be a positive integer.
|
||||
* </p>
|
||||
*/
|
||||
public int getExpungeThreadCount() {
|
||||
return myExpungeThreadCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* This setting controls the number of threads allocated to the expunge operation
|
||||
* <p>
|
||||
* The default value is set to the number of available processors
|
||||
* (via <code>Runtime.getRuntime().availableProcessors()</code>). Value
|
||||
* for this setting must be a positive integer.
|
||||
* </p>
|
||||
*/
|
||||
public void setExpungeThreadCount(int theExpungeThreadCount) {
|
||||
myExpungeThreadCount = theExpungeThreadCount;
|
||||
myExpungeThreadCount = Math.max(myExpungeThreadCount, 1); // Minimum of 1
|
||||
}
|
||||
|
||||
public ResourceEncodingEnum getResourceEncoding() {
|
||||
return myResourceEncoding;
|
||||
}
|
||||
|
@ -1048,6 +1077,22 @@ public class DaoConfig {
|
|||
myExpungeEnabled = theExpungeEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* The expunge batch size (default 800) determines the number of records deleted within a single transaction by the
|
||||
* expunge operation.
|
||||
*/
|
||||
public void setExpungeBatchSize(int theExpungeBatchSize) {
|
||||
myExpungeBatchSize = theExpungeBatchSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* The expunge batch size (default 800) determines the number of records deleted within a single transaction by the
|
||||
* expunge operation.
|
||||
*/
|
||||
public int getExpungeBatchSize() {
|
||||
return myExpungeBatchSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should contained IDs be indexed the same way that non-contained IDs are (default is
|
||||
* <code>true</code>)
|
||||
|
|
|
@ -23,7 +23,12 @@ package ca.uhn.fhir.jpa.dao.data;
|
|||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamCoords;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
|
||||
public interface IResourceIndexedSearchParamCoordsDao extends JpaRepository<ResourceIndexedSearchParamCoords, Long> {
|
||||
// nothing yet
|
||||
@Modifying
|
||||
@Query("delete from ResourceIndexedSearchParamCoords t WHERE t.myResourcePid = :resid")
|
||||
void deleteByResourceId(@Param("resid") Long theResourcePid);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,12 @@ package ca.uhn.fhir.jpa.dao.data;
|
|||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamDate;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
|
||||
public interface IResourceIndexedSearchParamDateDao extends JpaRepository<ResourceIndexedSearchParamDate, Long> {
|
||||
// nothing yet
|
||||
@Modifying
|
||||
@Query("delete from ResourceIndexedSearchParamDate t WHERE t.myResourcePid = :resid")
|
||||
void deleteByResourceId(@Param("resid") Long theResourcePid);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,12 @@ package ca.uhn.fhir.jpa.dao.data;
|
|||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamNumber;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
|
||||
public interface IResourceIndexedSearchParamNumberDao extends JpaRepository<ResourceIndexedSearchParamNumber, Long> {
|
||||
// nothing yet
|
||||
@Modifying
|
||||
@Query("delete from ResourceIndexedSearchParamNumber t WHERE t.myResourcePid = :resid")
|
||||
void deleteByResourceId(@Param("resid") Long theResourcePid);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,12 @@ package ca.uhn.fhir.jpa.dao.data;
|
|||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamQuantity;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
|
||||
public interface IResourceIndexedSearchParamQuantityDao extends JpaRepository<ResourceIndexedSearchParamQuantity, Long> {
|
||||
// nothing yet
|
||||
@Modifying
|
||||
@Query("delete from ResourceIndexedSearchParamQuantity t WHERE t.myResourcePid = :resid")
|
||||
void deleteByResourceId(@Param("resid") Long theResourcePid);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ package ca.uhn.fhir.jpa.dao.data;
|
|||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamString;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
|
||||
|
@ -31,4 +32,7 @@ public interface IResourceIndexedSearchParamStringDao extends JpaRepository<Reso
|
|||
@Query("select count(*) from ResourceIndexedSearchParamString t WHERE t.myResourcePid = :resid")
|
||||
int countForResourceId(@Param("resid") Long theResourcePid);
|
||||
|
||||
@Modifying
|
||||
@Query("delete from ResourceIndexedSearchParamString t WHERE t.myResourcePid = :resid")
|
||||
void deleteByResourceId(@Param("resid") Long theResourcePid);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.dao.data;
|
|||
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamToken;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
|
||||
|
@ -30,4 +31,7 @@ public interface IResourceIndexedSearchParamTokenDao extends JpaRepository<Resou
|
|||
@Query("select count(*) from ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :resid")
|
||||
int countForResourceId(@Param("resid") Long theResourcePid);
|
||||
|
||||
@Modifying
|
||||
@Query("delete from ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :resid")
|
||||
void deleteByResourceId(@Param("resid") Long theResourcePid);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Collection;
|
|||
*/
|
||||
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
|
||||
|
@ -32,5 +33,8 @@ public interface IResourceIndexedSearchParamUriDao extends JpaRepository<Resourc
|
|||
|
||||
@Query("SELECT DISTINCT p.myUri FROM ResourceIndexedSearchParamUri p WHERE p.myResourceType = :resource_type AND p.myParamName = :param_name")
|
||||
public Collection<String> findAllByResourceTypeAndParamName(@Param("resource_type") String theResourceType, @Param("param_name") String theParamName);
|
||||
|
||||
|
||||
@Modifying
|
||||
@Query("delete from ResourceIndexedSearchParamUri t WHERE t.myResourcePid = :resid")
|
||||
void deleteByResourceId(@Param("resid") Long theResourcePid);
|
||||
}
|
||||
|
|
|
@ -23,9 +23,13 @@ package ca.uhn.fhir.jpa.dao.data;
|
|||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceLink;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
|
||||
public interface IResourceLinkDao extends JpaRepository<ResourceLink, Long> {
|
||||
|
||||
|
||||
|
||||
@Modifying
|
||||
@Query("delete from ResourceLink t WHERE t.mySourceResourcePid = :resid")
|
||||
void deleteByResourceId(@Param("resid") Long theResourcePid);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Collection;
|
|||
*/
|
||||
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
|
||||
|
@ -34,4 +35,7 @@ public interface IResourceTagDao extends JpaRepository<ResourceTag, Long> {
|
|||
"INNER JOIN TagDefinition td ON (td.myId = t.myTagId) " +
|
||||
"WHERE t.myResourceId in (:pids)")
|
||||
Collection<ResourceTag> findByResourceIds(@Param("pids") Collection<Long> pids);
|
||||
}
|
||||
|
||||
@Modifying
|
||||
@Query("delete from ResourceTag t WHERE t.myResourceId = :resid")
|
||||
void deleteByResourceId(@Param("resid") Long theResourcePid);}
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
package ca.uhn.fhir.jpa.dao.expunge;
|
||||
|
||||
import ca.uhn.fhir.jpa.entity.*;
|
||||
import ca.uhn.fhir.jpa.model.entity.*;
|
||||
import ca.uhn.fhir.util.StopWatch;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.PersistenceContext;
|
||||
import javax.persistence.PersistenceContextType;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Service
|
||||
public class ExpungeEverythingService {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(ExpungeEverythingService.class);
|
||||
@Autowired
|
||||
private PlatformTransactionManager myPlatformTransactionManager;
|
||||
@PersistenceContext(type = PersistenceContextType.TRANSACTION)
|
||||
protected EntityManager myEntityManager;
|
||||
|
||||
void expungeEverything() {
|
||||
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
ourLog.info("BEGINNING GLOBAL $expunge");
|
||||
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
|
||||
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
|
||||
txTemplate.execute(t -> {
|
||||
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + ResourceHistoryTable.class.getSimpleName() + " d SET d.myForcedId = null"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + ResourceTable.class.getSimpleName() + " d SET d.myForcedId = null"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + TermCodeSystem.class.getSimpleName() + " d SET d.myCurrentVersion = null"));
|
||||
return null;
|
||||
});
|
||||
txTemplate.execute(t -> {
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchParamPresent.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ForcedId.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamDate.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamNumber.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamQuantity.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamString.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamToken.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamUri.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamCoords.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedCompositeStringUnique.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceLink.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchResult.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchInclude.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptParentChildLink.class.getSimpleName() + " d"));
|
||||
return null;
|
||||
});
|
||||
txTemplate.execute(t -> {
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroupElementTarget.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroupElement.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroup.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMap.class.getSimpleName() + " d"));
|
||||
return null;
|
||||
});
|
||||
txTemplate.execute(t -> {
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptProperty.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptDesignation.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConcept.class.getSimpleName() + " d"));
|
||||
for (TermCodeSystem next : myEntityManager.createQuery("SELECT c FROM " + TermCodeSystem.class.getName() + " c", TermCodeSystem.class).getResultList()) {
|
||||
next.setCurrentVersion(null);
|
||||
myEntityManager.merge(next);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
txTemplate.execute(t -> {
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermCodeSystemVersion.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermCodeSystem.class.getSimpleName() + " d"));
|
||||
return null;
|
||||
});
|
||||
txTemplate.execute(t -> {
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SubscriptionTable.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceHistoryTag.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceTag.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TagDefinition.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceHistoryTable.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceTable.class.getSimpleName() + " d"));
|
||||
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + org.hibernate.search.jpa.Search.class.getSimpleName() + " d"));
|
||||
return null;
|
||||
});
|
||||
|
||||
ourLog.info("COMPLETED GLOBAL $expunge - Deleted {} rows", counter.get());
|
||||
}
|
||||
|
||||
private int doExpungeEverythingQuery(String theQuery) {
|
||||
StopWatch sw = new StopWatch();
|
||||
int outcome = myEntityManager.createQuery(theQuery).executeUpdate();
|
||||
ourLog.debug("Query affected {} rows in {}: {}", outcome, sw.toString(), theQuery);
|
||||
return outcome;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
package ca.uhn.fhir.jpa.dao.expunge;
|
||||
|
||||
import ca.uhn.fhir.jpa.util.ExpungeOptions;
|
||||
import ca.uhn.fhir.jpa.util.ExpungeOutcome;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.data.domain.Slice;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Component
|
||||
@Scope("prototype")
|
||||
public class ExpungeRun implements Callable<ExpungeOutcome> {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(ExpungeService.class);
|
||||
|
||||
@Autowired
|
||||
private PlatformTransactionManager myPlatformTransactionManager;
|
||||
@Autowired
|
||||
private IResourceExpungeService myExpungeDaoService;
|
||||
@Autowired
|
||||
private PartitionRunner myPartitionRunner;
|
||||
|
||||
private final String myResourceName;
|
||||
private final Long myResourceId;
|
||||
private final Long myVersion;
|
||||
private final ExpungeOptions myExpungeOptions;
|
||||
private final AtomicInteger myRemainingCount;
|
||||
|
||||
public ExpungeRun(String theResourceName, Long theResourceId, Long theVersion, ExpungeOptions theExpungeOptions) {
|
||||
myResourceName = theResourceName;
|
||||
myResourceId = theResourceId;
|
||||
myVersion = theVersion;
|
||||
myExpungeOptions = theExpungeOptions;
|
||||
myRemainingCount = new AtomicInteger(myExpungeOptions.getLimit());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExpungeOutcome call() {
|
||||
if (myExpungeOptions.isExpungeDeletedResources() && myVersion == null) {
|
||||
expungeDeletedResources();
|
||||
if (expungeLimitReached()) {
|
||||
return expungeOutcome();
|
||||
}
|
||||
}
|
||||
|
||||
if (myExpungeOptions.isExpungeOldVersions()) {
|
||||
expungeOldVersions();
|
||||
if (expungeLimitReached()) {
|
||||
return expungeOutcome();
|
||||
}
|
||||
}
|
||||
|
||||
return expungeOutcome();
|
||||
}
|
||||
|
||||
private void expungeDeletedResources() {
|
||||
Slice<Long> resourceIds = findHistoricalVersionsOfDeletedResources();
|
||||
|
||||
deleteSearchResultCacheEntries(resourceIds);
|
||||
deleteHistoricalVersions(resourceIds);
|
||||
if (expungeLimitReached()) {
|
||||
return;
|
||||
}
|
||||
|
||||
deleteCurrentVersionsOfDeletedResources(resourceIds);
|
||||
}
|
||||
|
||||
private Slice<Long> findHistoricalVersionsOfDeletedResources() {
|
||||
return myExpungeDaoService.findHistoricalVersionsOfDeletedResources(myResourceName, myResourceId, myRemainingCount.get());
|
||||
}
|
||||
|
||||
private Slice<Long> findHistoricalVersionsOfNonDeletedResources() {
|
||||
return myExpungeDaoService.findHistoricalVersionsOfNonDeletedResources(myResourceName, myResourceId, myVersion, myRemainingCount.get());
|
||||
}
|
||||
|
||||
private boolean expungeLimitReached() {
|
||||
boolean expungeLimitReached = myRemainingCount.get() <= 0;
|
||||
if (expungeLimitReached) {
|
||||
ourLog.debug("Expunge limit has been hit - Stopping operation");
|
||||
}
|
||||
return expungeLimitReached;
|
||||
}
|
||||
|
||||
private void expungeOldVersions() {
|
||||
Slice<Long> historicalIds = findHistoricalVersionsOfNonDeletedResources();
|
||||
|
||||
myPartitionRunner.runInPartitionedThreads(historicalIds, partition -> myExpungeDaoService.expungeHistoricalVersions(partition, myRemainingCount));
|
||||
}
|
||||
|
||||
private void deleteCurrentVersionsOfDeletedResources(Slice<Long> theResourceIds) {
|
||||
myPartitionRunner.runInPartitionedThreads(theResourceIds, partition -> myExpungeDaoService.expungeCurrentVersionOfResources(partition, myRemainingCount));
|
||||
}
|
||||
|
||||
private void deleteHistoricalVersions(Slice<Long> theResourceIds) {
|
||||
myPartitionRunner.runInPartitionedThreads(theResourceIds, partition -> myExpungeDaoService.expungeHistoricalVersionsOfIds(partition, myRemainingCount));
|
||||
}
|
||||
|
||||
private void deleteSearchResultCacheEntries(Slice<Long> theResourceIds) {
|
||||
myPartitionRunner.runInPartitionedThreads(theResourceIds, partition -> myExpungeDaoService.deleteByResourceIdPartitions(partition));
|
||||
}
|
||||
|
||||
private ExpungeOutcome expungeOutcome() {
|
||||
return new ExpungeOutcome().setDeletedCount(myExpungeOptions.getLimit() - myRemainingCount.get());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
package ca.uhn.fhir.jpa.dao.expunge;
|
||||
|
||||
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.util.ExpungeOptions;
|
||||
import ca.uhn.fhir.jpa.util.ExpungeOutcome;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.MethodNotAllowedException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Lookup;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public abstract class ExpungeService {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(ExpungeService.class);
|
||||
|
||||
@Autowired
|
||||
private DaoConfig myConfig;
|
||||
@Autowired
|
||||
private ExpungeEverythingService myExpungeEverythingService;
|
||||
@Autowired
|
||||
private IResourceExpungeService myExpungeDaoService;
|
||||
|
||||
@Lookup
|
||||
protected abstract ExpungeRun getExpungeRun(String theResourceName, Long theResourceId, Long theVersion, ExpungeOptions theExpungeOptions);
|
||||
|
||||
public ExpungeOutcome expunge(String theResourceName, Long theResourceId, Long theVersion, ExpungeOptions theExpungeOptions) {
|
||||
ourLog.info("Expunge: ResourceName[{}] Id[{}] Version[{}] Options[{}]", theResourceName, theResourceId, theVersion, theExpungeOptions);
|
||||
|
||||
if (!myConfig.isExpungeEnabled()) {
|
||||
throw new MethodNotAllowedException("$expunge is not enabled on this server");
|
||||
}
|
||||
|
||||
if (theExpungeOptions.getLimit() < 1) {
|
||||
throw new InvalidRequestException("Expunge limit may not be less than 1. Received expunge limit " + theExpungeOptions.getLimit() + ".");
|
||||
}
|
||||
|
||||
if (theResourceName == null && theResourceId == null && theVersion == null) {
|
||||
if (theExpungeOptions.isExpungeEverything()) {
|
||||
myExpungeEverythingService.expungeEverything();
|
||||
}
|
||||
}
|
||||
|
||||
ExpungeRun expungeRun = getExpungeRun(theResourceName, theResourceId, theVersion, theExpungeOptions);
|
||||
return expungeRun.call();
|
||||
}
|
||||
|
||||
public void deleteAllSearchParams(Long theResourceId) {
|
||||
myExpungeDaoService.deleteAllSearchParams(theResourceId);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package ca.uhn.fhir.jpa.dao.expunge;
|
||||
|
||||
import org.springframework.data.domain.Slice;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public interface IResourceExpungeService {
|
||||
Slice<Long> findHistoricalVersionsOfDeletedResources(String theResourceName, Long theResourceId, int theI);
|
||||
|
||||
Slice<Long> findHistoricalVersionsOfNonDeletedResources(String theResourceName, Long theResourceId, Long theVersion, int theI);
|
||||
|
||||
void expungeHistoricalVersions(List<Long> thePartition, AtomicInteger theRemainingCount);
|
||||
|
||||
void expungeCurrentVersionOfResources(List<Long> thePartition, AtomicInteger theRemainingCount);
|
||||
|
||||
void expungeHistoricalVersionsOfIds(List<Long> thePartition, AtomicInteger theRemainingCount);
|
||||
|
||||
void deleteByResourceIdPartitions(List<Long> thePartition);
|
||||
|
||||
void deleteAllSearchParams(Long theResourceId);
|
||||
}
|
|
@ -0,0 +1,105 @@
|
|||
package ca.uhn.fhir.jpa.dao.expunge;
|
||||
|
||||
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||
import ca.uhn.fhir.util.StopWatch;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.domain.Slice;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@Service
|
||||
public class PartitionRunner {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(ExpungeService.class);
|
||||
private static final int MAX_POOL_SIZE = 1000;
|
||||
|
||||
private final DaoConfig myDaoConfig;
|
||||
|
||||
@Autowired
|
||||
public PartitionRunner(DaoConfig theDaoConfig) {
|
||||
myDaoConfig = theDaoConfig;
|
||||
}
|
||||
|
||||
void runInPartitionedThreads(Slice<Long> theResourceIds, Consumer<List<Long>> partitionConsumer) {
|
||||
|
||||
List<Callable<Void>> callableTasks = buildCallableTasks(theResourceIds, partitionConsumer);
|
||||
if (callableTasks.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
ExecutorService executorService = buildExecutor(callableTasks.size());
|
||||
try {
|
||||
List<Future<Void>> futures = executorService.invokeAll(callableTasks);
|
||||
// wait for all the threads to finish
|
||||
for (Future<Void> future : futures) {
|
||||
future.get();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
ourLog.error("Interrupted while expunging.", e);
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (ExecutionException e) {
|
||||
ourLog.error("Error while expunging.", e);
|
||||
throw new InternalErrorException(e);
|
||||
} finally {
|
||||
executorService.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private List<Callable<Void>> buildCallableTasks(Slice<Long> theResourceIds, Consumer<List<Long>> partitionConsumer) {
|
||||
List<Callable<Void>> retval = new ArrayList<>();
|
||||
|
||||
List<List<Long>> partitions = Lists.partition(theResourceIds.getContent(), myDaoConfig.getExpungeBatchSize());
|
||||
|
||||
for (List<Long> nextPartition : partitions) {
|
||||
Callable<Void> callableTask = () -> {
|
||||
ourLog.info("Expunging any search results pointing to {} resources", nextPartition.size());
|
||||
partitionConsumer.accept(nextPartition);
|
||||
return null;
|
||||
};
|
||||
retval.add(callableTask);
|
||||
}
|
||||
|
||||
return retval;
|
||||
}
|
||||
|
||||
|
||||
private ExecutorService buildExecutor(int numberOfTasks) {
|
||||
int threadCount = Math.min(numberOfTasks, myDaoConfig.getExpungeThreadCount());
|
||||
assert (threadCount > 0);
|
||||
|
||||
ourLog.info("Expunging with {} threads", threadCount);
|
||||
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(MAX_POOL_SIZE);
|
||||
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
|
||||
.namingPattern("expunge-%d")
|
||||
.daemon(false)
|
||||
.priority(Thread.NORM_PRIORITY)
|
||||
.build();
|
||||
RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
|
||||
ourLog.info("Note: Expunge executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
|
||||
StopWatch sw = new StopWatch();
|
||||
try {
|
||||
executorQueue.put(theRunnable);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RejectedExecutionException("Task " + theRunnable.toString() +
|
||||
" rejected from " + e.toString());
|
||||
}
|
||||
ourLog.info("Slot become available after {}ms", sw.getMillis());
|
||||
};
|
||||
return new ThreadPoolExecutor(
|
||||
threadCount,
|
||||
MAX_POOL_SIZE,
|
||||
0L,
|
||||
TimeUnit.MILLISECONDS,
|
||||
executorQueue,
|
||||
threadFactory,
|
||||
rejectedExecutionHandler);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,202 @@
|
|||
package ca.uhn.fhir.jpa.dao.expunge;
|
||||
|
||||
import ca.uhn.fhir.jpa.dao.data.*;
|
||||
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
|
||||
import ca.uhn.fhir.jpa.model.entity.ForcedId;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.domain.PageRequest;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.data.domain.Slice;
|
||||
import org.springframework.data.domain.SliceImpl;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Service
|
||||
class ResourceExpungeService implements IResourceExpungeService {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(ResourceExpungeService.class);
|
||||
|
||||
@Autowired
|
||||
private IResourceTableDao myResourceTableDao;
|
||||
@Autowired
|
||||
private ISearchResultDao mySearchResultDao;
|
||||
@Autowired
|
||||
private IResourceHistoryTableDao myResourceHistoryTableDao;
|
||||
@Autowired
|
||||
private IResourceIndexedSearchParamUriDao myResourceIndexedSearchParamUriDao;
|
||||
@Autowired
|
||||
private IResourceIndexedSearchParamStringDao myResourceIndexedSearchParamStringDao;
|
||||
@Autowired
|
||||
private IResourceIndexedSearchParamTokenDao myResourceIndexedSearchParamTokenDao;
|
||||
@Autowired
|
||||
private IResourceIndexedSearchParamDateDao myResourceIndexedSearchParamDateDao;
|
||||
@Autowired
|
||||
private IResourceIndexedSearchParamQuantityDao myResourceIndexedSearchParamQuantityDao;
|
||||
@Autowired
|
||||
private IResourceIndexedSearchParamCoordsDao myResourceIndexedSearchParamCoordsDao;
|
||||
@Autowired
|
||||
private IResourceIndexedSearchParamNumberDao myResourceIndexedSearchParamNumberDao;
|
||||
@Autowired
|
||||
private IResourceLinkDao myResourceLinkDao;
|
||||
@Autowired
|
||||
private IResourceTagDao myResourceTagDao;
|
||||
@Autowired
|
||||
private IdHelperService myIdHelperService;
|
||||
@Autowired
|
||||
private IResourceHistoryTagDao myResourceHistoryTagDao;
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public Slice<Long> findHistoricalVersionsOfNonDeletedResources(String theResourceName, Long theResourceId, Long theVersion, int theRemainingCount) {
|
||||
Pageable page = PageRequest.of(0, theRemainingCount);
|
||||
if (theResourceId != null) {
|
||||
if (theVersion != null) {
|
||||
return toSlice(myResourceHistoryTableDao.findForIdAndVersion(theResourceId, theVersion));
|
||||
} else {
|
||||
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResourceId(page, theResourceId);
|
||||
}
|
||||
} else {
|
||||
if (theResourceName != null) {
|
||||
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResources(page, theResourceName);
|
||||
} else {
|
||||
return myResourceHistoryTableDao.findIdsOfPreviousVersionsOfResources(page);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public Slice<Long> findHistoricalVersionsOfDeletedResources(String theResourceName, Long theResourceId, int theRemainingCount) {
|
||||
Pageable page = PageRequest.of(0, theRemainingCount);
|
||||
if (theResourceId != null) {
|
||||
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResourcesOfType(page, theResourceId, theResourceName);
|
||||
ourLog.info("Expunging {} deleted resources of type[{}] and ID[{}]", ids.getNumberOfElements(), theResourceName, theResourceId);
|
||||
return ids;
|
||||
} else {
|
||||
if (theResourceName != null) {
|
||||
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResourcesOfType(page, theResourceName);
|
||||
ourLog.info("Expunging {} deleted resources of type[{}]", ids.getNumberOfElements(), theResourceName);
|
||||
return ids;
|
||||
} else {
|
||||
Slice<Long> ids = myResourceTableDao.findIdsOfDeletedResources(page);
|
||||
ourLog.info("Expunging {} deleted resources (all types)", ids.getNumberOfElements());
|
||||
return ids;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void expungeCurrentVersionOfResources(List<Long> theResourceIds, AtomicInteger theRemainingCount) {
|
||||
for (Long next : theResourceIds) {
|
||||
expungeCurrentVersionOfResource(next, theRemainingCount);
|
||||
if (theRemainingCount.get() <= 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void expungeHistoricalVersion(Long theNextVersionId) {
|
||||
ResourceHistoryTable version = myResourceHistoryTableDao.findById(theNextVersionId).orElseThrow(IllegalArgumentException::new);
|
||||
ourLog.info("Deleting resource version {}", version.getIdDt().getValue());
|
||||
|
||||
myResourceHistoryTagDao.deleteAll(version.getTags());
|
||||
myResourceHistoryTableDao.delete(version);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void expungeHistoricalVersionsOfIds(List<Long> theResourceIds, AtomicInteger theRemainingCount) {
|
||||
for (Long next : theResourceIds) {
|
||||
expungeHistoricalVersionsOfId(next, theRemainingCount);
|
||||
if (theRemainingCount.get() <= 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void expungeHistoricalVersions(List<Long> theHistoricalIds, AtomicInteger theRemainingCount) {
|
||||
for (Long next : theHistoricalIds) {
|
||||
expungeHistoricalVersion(next);
|
||||
if (theRemainingCount.decrementAndGet() <= 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void expungeCurrentVersionOfResource(Long myResourceId, AtomicInteger theRemainingCount) {
|
||||
ResourceTable resource = myResourceTableDao.findById(myResourceId).orElseThrow(IllegalStateException::new);
|
||||
|
||||
ResourceHistoryTable currentVersion = myResourceHistoryTableDao.findForIdAndVersion(resource.getId(), resource.getVersion());
|
||||
if (currentVersion != null) {
|
||||
expungeHistoricalVersion(currentVersion.getId());
|
||||
}
|
||||
|
||||
ourLog.info("Expunging current version of resource {}", resource.getIdDt().getValue());
|
||||
|
||||
deleteAllSearchParams(resource.getResourceId());
|
||||
resource.getTags().clear();
|
||||
|
||||
if (resource.getForcedId() != null) {
|
||||
ForcedId forcedId = resource.getForcedId();
|
||||
resource.setForcedId(null);
|
||||
myResourceTableDao.saveAndFlush(resource);
|
||||
myIdHelperService.delete(forcedId);
|
||||
}
|
||||
|
||||
myResourceTableDao.delete(resource);
|
||||
|
||||
theRemainingCount.decrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void deleteAllSearchParams(Long theResourceId) {
|
||||
myResourceIndexedSearchParamUriDao.deleteByResourceId(theResourceId);
|
||||
myResourceIndexedSearchParamCoordsDao.deleteByResourceId(theResourceId);
|
||||
myResourceIndexedSearchParamDateDao.deleteByResourceId(theResourceId);
|
||||
myResourceIndexedSearchParamNumberDao.deleteByResourceId(theResourceId);
|
||||
myResourceIndexedSearchParamQuantityDao.deleteByResourceId(theResourceId);
|
||||
myResourceIndexedSearchParamStringDao.deleteByResourceId(theResourceId);
|
||||
myResourceIndexedSearchParamTokenDao.deleteByResourceId(theResourceId);
|
||||
myResourceLinkDao.deleteByResourceId(theResourceId);
|
||||
|
||||
myResourceTagDao.deleteByResourceId(theResourceId);
|
||||
}
|
||||
|
||||
private void expungeHistoricalVersionsOfId(Long myResourceId, AtomicInteger theRemainingCount) {
|
||||
ResourceTable resource = myResourceTableDao.findById(myResourceId).orElseThrow(IllegalArgumentException::new);
|
||||
|
||||
Pageable page = PageRequest.of(0, theRemainingCount.get());
|
||||
|
||||
Slice<Long> versionIds = myResourceHistoryTableDao.findForResourceId(page, resource.getId(), resource.getVersion());
|
||||
ourLog.debug("Found {} versions of resource {} to expunge", versionIds.getNumberOfElements(), resource.getIdDt().getValue());
|
||||
for (Long nextVersionId : versionIds) {
|
||||
expungeHistoricalVersion(nextVersionId);
|
||||
if (theRemainingCount.decrementAndGet() <= 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void deleteByResourceIdPartitions(List<Long> theResourceIds) {
|
||||
mySearchResultDao.deleteByResourceIds(theResourceIds);
|
||||
}
|
||||
|
||||
private Slice<Long> toSlice(ResourceHistoryTable myVersion) {
|
||||
Validate.notNull(myVersion);
|
||||
return new SliceImpl<>(Collections.singletonList(myVersion.getId()));
|
||||
}
|
||||
}
|
|
@ -26,7 +26,10 @@ import net.ttddyy.dsproxy.proxy.ParameterSetOperation;
|
|||
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
|
||||
import org.hibernate.engine.jdbc.internal.BasicFormatterImpl;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.trim;
|
||||
|
@ -55,7 +58,9 @@ public abstract class BaseCaptureQueriesListener implements ProxyDataSourceBuild
|
|||
for (QueryInfo next : theQueryInfoList) {
|
||||
String sql = trim(next.getQuery());
|
||||
List<String> params;
|
||||
int size = 0;
|
||||
if (next.getParametersList().size() > 0 && next.getParametersList().get(0).size() > 0) {
|
||||
size = next.getParametersList().size();
|
||||
List<ParameterSetOperation> values = next
|
||||
.getParametersList()
|
||||
.get(0);
|
||||
|
@ -74,7 +79,7 @@ public abstract class BaseCaptureQueriesListener implements ProxyDataSourceBuild
|
|||
|
||||
long elapsedTime = theExecutionInfo.getElapsedTime();
|
||||
long startTime = System.currentTimeMillis() - elapsedTime;
|
||||
queryList.add(new Query(sql, params, startTime, elapsedTime, stackTraceElements));
|
||||
queryList.add(new Query(sql, params, startTime, elapsedTime, stackTraceElements, size));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -87,13 +92,15 @@ public abstract class BaseCaptureQueriesListener implements ProxyDataSourceBuild
|
|||
private final long myQueryTimestamp;
|
||||
private final long myElapsedTime;
|
||||
private final StackTraceElement[] myStackTrace;
|
||||
private final int mySize;
|
||||
|
||||
Query(String theSql, List<String> theParams, long theQueryTimestamp, long theElapsedTime, StackTraceElement[] theStackTraceElements) {
|
||||
Query(String theSql, List<String> theParams, long theQueryTimestamp, long theElapsedTime, StackTraceElement[] theStackTraceElements, int theSize) {
|
||||
mySql = theSql;
|
||||
myParams = Collections.unmodifiableList(theParams);
|
||||
myQueryTimestamp = theQueryTimestamp;
|
||||
myElapsedTime = theElapsedTime;
|
||||
myStackTrace = theStackTraceElements;
|
||||
mySize = theSize;
|
||||
}
|
||||
|
||||
public long getQueryTimestamp() {
|
||||
|
@ -121,7 +128,6 @@ public abstract class BaseCaptureQueriesListener implements ProxyDataSourceBuild
|
|||
|
||||
if (theInlineParams) {
|
||||
List<String> nextParams = new ArrayList<>(myParams);
|
||||
|
||||
int idx = 0;
|
||||
while (nextParams.size() > 0) {
|
||||
idx = retVal.indexOf("?", idx);
|
||||
|
@ -134,6 +140,9 @@ public abstract class BaseCaptureQueriesListener implements ProxyDataSourceBuild
|
|||
}
|
||||
}
|
||||
|
||||
if (mySize > 1) {
|
||||
retVal += "\nsize: " + mySize + "\n";
|
||||
}
|
||||
return trim(retVal);
|
||||
|
||||
}
|
||||
|
@ -141,6 +150,10 @@ public abstract class BaseCaptureQueriesListener implements ProxyDataSourceBuild
|
|||
public StackTraceElement[] getStackTrace() {
|
||||
return myStackTrace;
|
||||
}
|
||||
|
||||
public int getSize() {
|
||||
return mySize;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -211,6 +211,39 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe
|
|||
ourLog.info("Insert Queries:\n{}", String.join("\n", queries));
|
||||
}
|
||||
|
||||
/**
|
||||
* Log all captured INSERT queries
|
||||
*/
|
||||
public void logUpdateQueries() {
|
||||
List<String> queries = getUpdateQueries()
|
||||
.stream()
|
||||
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
|
||||
.collect(Collectors.toList());
|
||||
ourLog.info("Update Queries:\n{}", String.join("\n", queries));
|
||||
}
|
||||
|
||||
/**
|
||||
* Log all captured DELETE queries
|
||||
*/
|
||||
public void logDeleteQueriesForCurrentThread() {
|
||||
List<String> queries = getDeleteQueriesForCurrentThread()
|
||||
.stream()
|
||||
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
|
||||
.collect(Collectors.toList());
|
||||
ourLog.info("Delete Queries:\n{}", String.join("\n", queries));
|
||||
}
|
||||
|
||||
/**
|
||||
* Log all captured DELETE queries
|
||||
*/
|
||||
public void logDeleteQueries() {
|
||||
List<String> queries = getDeleteQueries()
|
||||
.stream()
|
||||
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
|
||||
.collect(Collectors.toList());
|
||||
ourLog.info("Delete Queries:\n{}", String.join("\n", queries));
|
||||
}
|
||||
|
||||
public int countSelectQueries() {
|
||||
return getSelectQueries().size();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
package ca.uhn.fhir.jpa.dao.expunge;
|
||||
|
||||
import ca.uhn.fhir.interceptor.api.HookParams;
|
||||
import ca.uhn.fhir.jpa.config.TestDstu3Config;
|
||||
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.model.concurrency.PointcutLatch;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.domain.Slice;
|
||||
import org.springframework.data.domain.SliceImpl;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.isOneOf;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@ContextConfiguration(classes = {TestDstu3Config.class})
|
||||
public class PartitionRunnerTest {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(PartitionRunnerTest.class);
|
||||
private static final String EXPUNGE_THREADNAME_1 = "expunge-1";
|
||||
private static final String EXPUNGE_THREADNAME_2 = "expunge-2";
|
||||
|
||||
@Autowired
|
||||
private PartitionRunner myPartitionRunner;
|
||||
|
||||
@Autowired
|
||||
private DaoConfig myDaoConfig;
|
||||
private PointcutLatch myLatch = new PointcutLatch("partition call");
|
||||
|
||||
@After
|
||||
public void before() {
|
||||
myDaoConfig.setExpungeThreadCount(new DaoConfig().getExpungeThreadCount());
|
||||
myDaoConfig.setExpungeBatchSize(new DaoConfig().getExpungeBatchSize());
|
||||
myLatch.clear();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyList() {
|
||||
Slice<Long> resourceIds = buildSlice(0);
|
||||
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
||||
myLatch.setExpectedCount(0);
|
||||
myPartitionRunner.runInPartitionedThreads(resourceIds, partitionConsumer);
|
||||
myLatch.clear();
|
||||
}
|
||||
|
||||
private Slice<Long> buildSlice(int size) {
|
||||
List<Long> list = new ArrayList<>();
|
||||
for (long i = 0; i < size; ++i) {
|
||||
list.add(i + 1);
|
||||
}
|
||||
return new SliceImpl(list);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void oneItem() throws InterruptedException {
|
||||
Slice<Long> resourceIds = buildSlice(1);
|
||||
|
||||
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
||||
myLatch.setExpectedCount(1);
|
||||
myPartitionRunner.runInPartitionedThreads(resourceIds, partitionConsumer);
|
||||
PartitionCall partitionCall = (PartitionCall) PointcutLatch.getLatchInvocationParameter(myLatch.awaitExpected());
|
||||
assertEquals(EXPUNGE_THREADNAME_1, partitionCall.threadName);
|
||||
assertEquals(1, partitionCall.size);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void twoItems() throws InterruptedException {
|
||||
Slice<Long> resourceIds = buildSlice(2);
|
||||
|
||||
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
||||
myLatch.setExpectedCount(1);
|
||||
myPartitionRunner.runInPartitionedThreads(resourceIds, partitionConsumer);
|
||||
PartitionCall partitionCall = (PartitionCall) PointcutLatch.getLatchInvocationParameter(myLatch.awaitExpected());
|
||||
assertEquals(EXPUNGE_THREADNAME_1, partitionCall.threadName);
|
||||
assertEquals(2, partitionCall.size);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void tenItemsBatch5() throws InterruptedException {
|
||||
Slice<Long> resourceIds = buildSlice(10);
|
||||
myDaoConfig.setExpungeBatchSize(5);
|
||||
|
||||
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
||||
myLatch.setExpectedCount(2);
|
||||
myPartitionRunner.runInPartitionedThreads(resourceIds, partitionConsumer);
|
||||
List<HookParams> calls = myLatch.awaitExpected();
|
||||
PartitionCall partitionCall1 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 0);
|
||||
assertThat(partitionCall1.threadName, isOneOf(EXPUNGE_THREADNAME_1, EXPUNGE_THREADNAME_2));
|
||||
assertEquals(5, partitionCall1.size);
|
||||
PartitionCall partitionCall2 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 1);
|
||||
assertThat(partitionCall2.threadName, isOneOf(EXPUNGE_THREADNAME_1, EXPUNGE_THREADNAME_2));
|
||||
assertEquals(5, partitionCall2.size);
|
||||
assertNotEquals(partitionCall1.threadName, partitionCall2.threadName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nineItemsBatch5() throws InterruptedException {
|
||||
Slice<Long> resourceIds = buildSlice(9);
|
||||
myDaoConfig.setExpungeBatchSize(5);
|
||||
|
||||
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
||||
myLatch.setExpectedCount(2);
|
||||
myPartitionRunner.runInPartitionedThreads(resourceIds, partitionConsumer);
|
||||
List<HookParams> calls = myLatch.awaitExpected();
|
||||
PartitionCall partitionCall1 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 0);
|
||||
assertThat(partitionCall1.threadName, isOneOf(EXPUNGE_THREADNAME_1, EXPUNGE_THREADNAME_2));
|
||||
assertEquals(5, partitionCall1.size);
|
||||
PartitionCall partitionCall2 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 1);
|
||||
assertThat(partitionCall2.threadName, isOneOf(EXPUNGE_THREADNAME_1, EXPUNGE_THREADNAME_2));
|
||||
assertEquals(4, partitionCall2.size);
|
||||
assertNotEquals(partitionCall1.threadName, partitionCall2.threadName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void tenItemsOneThread() throws InterruptedException {
|
||||
Slice<Long> resourceIds = buildSlice(10);
|
||||
myDaoConfig.setExpungeBatchSize(5);
|
||||
myDaoConfig.setExpungeThreadCount(1);
|
||||
|
||||
Consumer<List<Long>> partitionConsumer = buildPartitionConsumer(myLatch);
|
||||
myLatch.setExpectedCount(2);
|
||||
myPartitionRunner.runInPartitionedThreads(resourceIds, partitionConsumer);
|
||||
List<HookParams> calls = myLatch.awaitExpected();
|
||||
{
|
||||
PartitionCall partitionCall = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 0);
|
||||
assertEquals(EXPUNGE_THREADNAME_1, partitionCall.threadName);
|
||||
assertEquals(5, partitionCall.size);
|
||||
}
|
||||
{
|
||||
PartitionCall partitionCall = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 1);
|
||||
assertEquals(EXPUNGE_THREADNAME_1, partitionCall.threadName);
|
||||
assertEquals(5, partitionCall.size);
|
||||
}
|
||||
}
|
||||
|
||||
private Consumer<List<Long>> buildPartitionConsumer(PointcutLatch latch) {
|
||||
return list -> latch.call(new PartitionCall(Thread.currentThread().getName(), list.size()));
|
||||
}
|
||||
|
||||
static class PartitionCall {
|
||||
private final String threadName;
|
||||
private final int size;
|
||||
|
||||
PartitionCall(String theThreadName, int theSize) {
|
||||
threadName = theThreadName;
|
||||
size = theSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new ToStringBuilder(this)
|
||||
.append("myThreadName", threadName)
|
||||
.append("mySize", size)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package ca.uhn.fhir.jpa.provider.dstu3;
|
||||
|
||||
import ca.uhn.fhir.jpa.util.BaseCaptureQueriesListener;
|
||||
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
|
||||
import org.hl7.fhir.dstu3.model.CodeableConcept;
|
||||
import org.hl7.fhir.dstu3.model.Observation;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class ResourceProviderDeleteSqlDstu3Test extends BaseResourceProviderDstu3Test {
|
||||
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(ResourceProviderDeleteSqlDstu3Test.class);
|
||||
|
||||
@Autowired
|
||||
protected CircularQueueCaptureQueriesListener myCaptureQueriesListener;
|
||||
|
||||
@Test
|
||||
public void testDeleteFortyTokensWithOneCommand() {
|
||||
Observation o = new Observation();
|
||||
o.setStatus(Observation.ObservationStatus.FINAL);
|
||||
for (int i = 0; i < 40; ++i) {
|
||||
CodeableConcept code = new CodeableConcept();
|
||||
code.addCoding().setSystem("foo").setCode("Code" + i);
|
||||
o.getCategory().add(code);
|
||||
}
|
||||
IIdType observationId = myObservationDao.create(o).getId();
|
||||
|
||||
myCaptureQueriesListener.clear();
|
||||
myObservationDao.delete(observationId);
|
||||
myCaptureQueriesListener.logDeleteQueries();
|
||||
long deleteCount = myCaptureQueriesListener.getDeleteQueries()
|
||||
.stream()
|
||||
.filter(query -> query.getSql(false, false).contains("HFJ_SPIDX_TOKEN"))
|
||||
.collect(Collectors.summarizingInt(BaseCaptureQueriesListener.Query::getSize))
|
||||
.getSum();
|
||||
assertEquals(1, deleteCount);
|
||||
}
|
||||
}
|
|
@ -1,23 +1,63 @@
|
|||
package ca.uhn.fhir.jpa.provider.r4;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.containsInRelativeOrder;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.emptyString;
|
||||
import static org.hamcrest.Matchers.endsWith;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.hasItems;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
import static org.hamcrest.Matchers.stringContainsInOrder;
|
||||
import static org.junit.Assert.*;
|
||||
import ca.uhn.fhir.jpa.config.TestR4Config;
|
||||
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.entity.Search;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
|
||||
import ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl;
|
||||
import ca.uhn.fhir.jpa.util.JpaConstants;
|
||||
import ca.uhn.fhir.jpa.util.TestUtil;
|
||||
import ca.uhn.fhir.model.api.TemporalPrecisionEnum;
|
||||
import ca.uhn.fhir.model.primitive.InstantDt;
|
||||
import ca.uhn.fhir.model.primitive.UriDt;
|
||||
import ca.uhn.fhir.parser.IParser;
|
||||
import ca.uhn.fhir.parser.StrictErrorHandler;
|
||||
import ca.uhn.fhir.rest.api.Constants;
|
||||
import ca.uhn.fhir.rest.api.*;
|
||||
import ca.uhn.fhir.rest.client.api.IClientInterceptor;
|
||||
import ca.uhn.fhir.rest.client.api.IGenericClient;
|
||||
import ca.uhn.fhir.rest.client.api.IHttpRequest;
|
||||
import ca.uhn.fhir.rest.client.api.IHttpResponse;
|
||||
import ca.uhn.fhir.rest.client.interceptor.CapturingInterceptor;
|
||||
import ca.uhn.fhir.rest.gclient.StringClientParam;
|
||||
import ca.uhn.fhir.rest.param.*;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
||||
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
|
||||
import ca.uhn.fhir.util.StopWatch;
|
||||
import ca.uhn.fhir.util.UrlUtil;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.apache.http.NameValuePair;
|
||||
import org.apache.http.client.entity.UrlEncodedFormEntity;
|
||||
import org.apache.http.client.methods.*;
|
||||
import org.apache.http.entity.ByteArrayEntity;
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.message.BasicNameValuePair;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.hl7.fhir.instance.model.api.*;
|
||||
import org.hl7.fhir.r4.hapi.validation.FhirInstanceValidator;
|
||||
import org.hl7.fhir.r4.model.*;
|
||||
import org.hl7.fhir.r4.model.Bundle.*;
|
||||
import org.hl7.fhir.r4.model.Encounter.EncounterLocationComponent;
|
||||
import org.hl7.fhir.r4.model.Encounter.EncounterStatus;
|
||||
import org.hl7.fhir.r4.model.Enumerations.AdministrativeGender;
|
||||
import org.hl7.fhir.r4.model.Narrative.NarrativeStatus;
|
||||
import org.hl7.fhir.r4.model.Observation.ObservationStatus;
|
||||
import org.hl7.fhir.r4.model.Questionnaire.QuestionnaireItemType;
|
||||
import org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType;
|
||||
import org.hl7.fhir.r4.model.Subscription.SubscriptionStatus;
|
||||
import org.junit.*;
|
||||
import org.springframework.test.util.AopTestUtils;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
|
@ -27,97 +67,12 @@ import java.net.InetSocketAddress;
|
|||
import java.net.Socket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.util.TestUtil;
|
||||
import ca.uhn.fhir.rest.api.*;
|
||||
import ca.uhn.fhir.rest.api.Constants;
|
||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.apache.http.NameValuePair;
|
||||
import org.apache.http.client.entity.UrlEncodedFormEntity;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpDelete;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpPatch;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.client.methods.HttpPut;
|
||||
import org.apache.http.entity.ByteArrayEntity;
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.message.BasicNameValuePair;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.hl7.fhir.instance.model.api.*;
|
||||
import org.hl7.fhir.r4.hapi.validation.FhirInstanceValidator;
|
||||
import org.hl7.fhir.r4.model.*;
|
||||
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
|
||||
import org.hl7.fhir.r4.model.Bundle.BundleLinkComponent;
|
||||
import org.hl7.fhir.r4.model.Bundle.BundleType;
|
||||
import org.hl7.fhir.r4.model.Bundle.HTTPVerb;
|
||||
import org.hl7.fhir.r4.model.Bundle.SearchEntryMode;
|
||||
import org.hl7.fhir.r4.model.Encounter.EncounterLocationComponent;
|
||||
import org.hl7.fhir.r4.model.Encounter.EncounterStatus;
|
||||
import org.hl7.fhir.r4.model.Enumerations.AdministrativeGender;
|
||||
import org.hl7.fhir.r4.model.Narrative.NarrativeStatus;
|
||||
import org.hl7.fhir.r4.model.Observation.ObservationStatus;
|
||||
import org.hl7.fhir.r4.model.Questionnaire.QuestionnaireItemType;
|
||||
import org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType;
|
||||
import org.hl7.fhir.r4.model.Subscription.SubscriptionStatus;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.springframework.test.util.AopTestUtils;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallback;
|
||||
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import ca.uhn.fhir.jpa.config.TestR4Config;
|
||||
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
|
||||
import ca.uhn.fhir.jpa.entity.Search;
|
||||
import ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl;
|
||||
import ca.uhn.fhir.jpa.util.JpaConstants;
|
||||
import ca.uhn.fhir.model.api.TemporalPrecisionEnum;
|
||||
import ca.uhn.fhir.model.primitive.InstantDt;
|
||||
import ca.uhn.fhir.model.primitive.UriDt;
|
||||
import ca.uhn.fhir.parser.IParser;
|
||||
import ca.uhn.fhir.parser.StrictErrorHandler;
|
||||
import ca.uhn.fhir.rest.client.api.IClientInterceptor;
|
||||
import ca.uhn.fhir.rest.client.api.IGenericClient;
|
||||
import ca.uhn.fhir.rest.client.api.IHttpRequest;
|
||||
import ca.uhn.fhir.rest.client.api.IHttpResponse;
|
||||
import ca.uhn.fhir.rest.client.interceptor.CapturingInterceptor;
|
||||
import ca.uhn.fhir.rest.gclient.StringClientParam;
|
||||
import ca.uhn.fhir.rest.param.DateRangeParam;
|
||||
import ca.uhn.fhir.rest.param.NumberParam;
|
||||
import ca.uhn.fhir.rest.param.ParamPrefixEnum;
|
||||
import ca.uhn.fhir.rest.param.StringAndListParam;
|
||||
import ca.uhn.fhir.rest.param.StringOrListParam;
|
||||
import ca.uhn.fhir.rest.param.StringParam;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
||||
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
|
||||
import ca.uhn.fhir.util.StopWatch;
|
||||
import ca.uhn.fhir.util.UrlUtil;
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@SuppressWarnings("Duplicates")
|
||||
public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
|
||||
|
@ -2035,6 +1990,8 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
|
|||
}
|
||||
|
||||
assertThat(ids, hasItem(id.getIdPart()));
|
||||
|
||||
// TODO KHS this fails intermittently with 53 instead of 77
|
||||
assertEquals(LARGE_NUMBER, ids.size());
|
||||
for (int i = 1; i < LARGE_NUMBER; i++) {
|
||||
assertThat(ids.size() + " ids: " + ids, ids, hasItem("A" + StringUtils.leftPad(Integer.toString(i), 2, '0')));
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package ca.uhn.fhir.jpa.subscription.module;
|
||||
package ca.uhn.fhir.jpa.model.concurrency;
|
||||
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package ca.uhn.fhir.jpa.subscription.module;
|
||||
package ca.uhn.fhir.jpa.model.concurrency;
|
||||
|
||||
/*-
|
||||
* #%L
|
|
@ -1,9 +1,10 @@
|
|||
package ca.uhn.fhir.jpa.subscription.module;
|
||||
package ca.uhn.fhir.jpa.model.concurrency;
|
||||
|
||||
|
||||
import ca.uhn.fhir.interceptor.api.HookParams;
|
||||
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
|
||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -15,10 +16,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
// TODO KHS copy this version over to hapi-fhir
|
||||
// This class is primarily used for testing.
|
||||
public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(PointcutLatch.class);
|
||||
private static final int DEFAULT_TIMEOUT_SECONDS = 10;
|
||||
|
@ -77,7 +75,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
|
|||
public List<HookParams> awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException {
|
||||
List<HookParams> retval = myCalledWith.get();
|
||||
try {
|
||||
assertNotNull(getName() + " awaitExpected() called before setExpected() called.", myCountdownLatch);
|
||||
Validate.notNull(myCountdownLatch, getName() + " awaitExpected() called before setExpected() called.");
|
||||
if (!myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS)) {
|
||||
throw new AssertionError(getName() + " timed out waiting " + timeoutSecond + " seconds for latch to countdown from " + myInitialCount + " to 0. Is " + myCountdownLatch.getCount() + ".");
|
||||
}
|
||||
|
@ -97,7 +95,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
|
|||
} finally {
|
||||
clear();
|
||||
}
|
||||
assertEquals("Concurrency error: Latch switched while waiting.", retval, myCalledWith.get());
|
||||
Validate.isTrue(retval.equals(myCalledWith.get()), "Concurrency error: Latch switched while waiting.");
|
||||
return retval;
|
||||
}
|
||||
|
||||
|
@ -173,10 +171,15 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
|
|||
}
|
||||
|
||||
public static Object getLatchInvocationParameter(List<HookParams> theHookParams) {
|
||||
assertNotNull(theHookParams);
|
||||
assertEquals("Expected Pointcut to be invoked 1 time", 1, theHookParams.size());
|
||||
HookParams arg = theHookParams.get(0);
|
||||
assertEquals("Expected pointcut to be invoked with 1 argument", 1, arg.values().size());
|
||||
Validate.notNull(theHookParams);
|
||||
Validate.isTrue(theHookParams.size() == 1, "Expected Pointcut to be invoked 1 time");
|
||||
return getLatchInvocationParameter(theHookParams, 0);
|
||||
}
|
||||
|
||||
public static Object getLatchInvocationParameter(List<HookParams> theHookParams, int index) {
|
||||
Validate.notNull(theHookParams);
|
||||
HookParams arg = theHookParams.get(index);
|
||||
Validate.isTrue(arg.values().size() == 1, "Expected pointcut to be invoked with 1 argument");
|
||||
return arg.values().iterator().next();
|
||||
}
|
||||
}
|
|
@ -4,9 +4,9 @@ import ca.uhn.fhir.context.FhirContext;
|
|||
import ca.uhn.fhir.interceptor.api.HookParams;
|
||||
import ca.uhn.fhir.interceptor.api.IInterceptorService;
|
||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||
import ca.uhn.fhir.jpa.model.concurrency.IPointcutLatch;
|
||||
import ca.uhn.fhir.jpa.model.concurrency.PointcutLatch;
|
||||
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
|
||||
import ca.uhn.fhir.jpa.subscription.module.IPointcutLatch;
|
||||
import ca.uhn.fhir.jpa.subscription.module.PointcutLatch;
|
||||
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
|
||||
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
|
||||
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
package ca.uhn.fhirtest.config;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
|
||||
import ca.uhn.fhir.rest.server.interceptor.LoggingInterceptor;
|
||||
import ca.uhn.fhirtest.interceptor.AnalyticsInterceptor;
|
||||
import ca.uhn.fhirtest.joke.HolyFooCowInterceptor;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class CommonConfig {
|
||||
|
|
|
@ -2,10 +2,13 @@ package ca.uhn.fhir.util;
|
|||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import org.hl7.fhir.r4.model.Bundle;
|
||||
import org.hl7.fhir.r4.model.Patient;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class BundleUtilTest {
|
||||
|
||||
private static FhirContext ourCtx = FhirContext.forR4();
|
||||
|
@ -38,6 +41,17 @@ public class BundleUtilTest {
|
|||
Assert.assertEquals(null, BundleUtil.getTotal(ourCtx, b));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void toListOfResourcesOfTypeTest() {
|
||||
Bundle bundle = new Bundle();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
bundle.addEntry(new Bundle.BundleEntryComponent().setResource(new Patient()));
|
||||
}
|
||||
List<Patient> list = BundleUtil.toListOfResourcesOfType(ourCtx, bundle, Patient.class);
|
||||
Assert.assertEquals(5, list.size());
|
||||
}
|
||||
|
||||
|
||||
@AfterClass
|
||||
public static void afterClassClearContext() {
|
||||
TestUtil.clearAllStaticFieldsForUnitTest();
|
||||
|
|
|
@ -175,7 +175,7 @@
|
|||
The JSON Patch provider has been switched to use the provider from the
|
||||
<![CDATA[
|
||||
<a href="https://github.com/java-json-tools/json-patch">Java JSON Tools</a>
|
||||
]]>
|
||||
]]>
|
||||
project, as it is much more robust and fault tolerant.
|
||||
</action>
|
||||
<action type="fix">
|
||||
|
@ -186,6 +186,10 @@
|
|||
ensure that the patch was valid for the candidate resource. This means that invalid patches
|
||||
are caught and not just silently ignored.
|
||||
</action>
|
||||
<action type="add">
|
||||
Expunges are now done in batches in multiple threads. Both the number of expunge threads and batch size are configurable
|
||||
in DaoConfig.
|
||||
</action>
|
||||
</release>
|
||||
<release version="3.7.0" date="2019-02-06" description="Gale">
|
||||
<action type="add">
|
||||
|
|
Loading…
Reference in New Issue