API cleanup following JPA search performance enhancements

This commit is contained in:
James Agnew 2018-10-03 07:43:01 -04:00
parent 9d1e8aa246
commit 5d5ee78873
18 changed files with 666 additions and 273 deletions

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.rest.param;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -19,17 +19,17 @@ package ca.uhn.fhir.rest.param;
* limitations under the License.
* #L%
*/
import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.model.base.composite.BaseCodingDt;
import ca.uhn.fhir.model.base.composite.BaseIdentifierDt;
import ca.uhn.fhir.model.primitive.UriDt;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class TokenParam extends BaseParam /*implements IQueryParameterType*/ {
@ -47,9 +47,8 @@ public class TokenParam extends BaseParam /*implements IQueryParameterType*/ {
/**
* Constructor which copies the {@link InternalCodingDt#getSystemElement() system} and
* {@link InternalCodingDt#getCodeElement() code} from a {@link InternalCodingDt} instance and adds it as a parameter
*
* @param theCodingDt
* The coding
*
* @param theCodingDt The coding
*/
public TokenParam(BaseCodingDt theCodingDt) {
this(toSystemValue(theCodingDt.getSystemElement()), theCodingDt.getCodeElement().getValue());
@ -59,9 +58,8 @@ public class TokenParam extends BaseParam /*implements IQueryParameterType*/ {
* Constructor which copies the {@link BaseIdentifierDt#getSystemElement() system} and
* {@link BaseIdentifierDt#getValueElement() value} from a {@link BaseIdentifierDt} instance and adds it as a
* parameter
*
* @param theIdentifierDt
* The identifier
*
* @param theIdentifierDt The identifier
*/
public TokenParam(BaseIdentifierDt theIdentifierDt) {
this(toSystemValue(theIdentifierDt.getSystemElement()), theIdentifierDt.getValueElement().getValue());
@ -81,6 +79,13 @@ public class TokenParam extends BaseParam /*implements IQueryParameterType*/ {
setText(theText);
}
/**
* Constructor that takes a code but no system
*/
public TokenParam(String theCode) {
this(null, theCode);
}
@Override
String doGetQueryParameterQualifier() {
if (getModifier() != null) {
@ -109,7 +114,7 @@ public class TokenParam extends BaseParam /*implements IQueryParameterType*/ {
if (theQualifier != null) {
TokenParamModifier modifier = TokenParamModifier.forValue(theQualifier);
setModifier(modifier);
if (modifier == TokenParamModifier.TEXT) {
setSystem(null);
setValue(ParameterUtil.unescape(theParameter));
@ -138,13 +143,18 @@ public class TokenParam extends BaseParam /*implements IQueryParameterType*/ {
return myModifier;
}
public TokenParam setModifier(TokenParamModifier theModifier) {
myModifier = theModifier;
return this;
}
/**
* Returns the system for this token. Note that if a {@link #getModifier()} is being used, the entire value of the
* parameter will be placed in {@link #getValue() value} and this method will return <code>null</code>.
* parameter will be placed in {@link #getValue() value} and this method will return <code>null</code>.
* <p
* Also note that this value may be <code>null</code> or <code>""</code> (empty string) and that
* each of these have a different meaning. When a token is passed on a URL and it has no
* vertical bar (often meaning "return values that match the given code in any codesystem")
* vertical bar (often meaning "return values that match the given code in any codesystem")
* this method will return <code>null</code>. When a token is passed on a URL and it has
* a vetical bar but nothing before the bar (often meaning "return values that match the
* given code but that have no codesystem) this method will return <code>""</code>
@ -154,14 +164,24 @@ public class TokenParam extends BaseParam /*implements IQueryParameterType*/ {
return mySystem;
}
public TokenParam setSystem(String theSystem) {
mySystem = theSystem;
return this;
}
/**
* Returns the value for the token (generally the value to the right of the
* vertical bar on the URL)
* vertical bar on the URL)
*/
public String getValue() {
return myValue;
}
public TokenParam setValue(String theValue) {
myValue = theValue;
return this;
}
public InternalCodingDt getValueAsCoding() {
return new InternalCodingDt(mySystem, myValue);
}
@ -181,16 +201,6 @@ public class TokenParam extends BaseParam /*implements IQueryParameterType*/ {
return myModifier == TokenParamModifier.TEXT;
}
public TokenParam setModifier(TokenParamModifier theModifier) {
myModifier = theModifier;
return this;
}
public TokenParam setSystem(String theSystem) {
mySystem = theSystem;
return this;
}
/**
* @deprecated Use {@link #setModifier(TokenParamModifier)} instead
*/
@ -204,11 +214,6 @@ public class TokenParam extends BaseParam /*implements IQueryParameterType*/ {
return this;
}
public TokenParam setValue(String theValue) {
myValue = theValue;
return this;
}
@Override
public String toString() {
ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);

View File

@ -0,0 +1,6 @@
package ca.uhn.fhir.jpa.dao;
import java.util.Iterator;
public interface IResultIterator extends Iterator<Long> {
}

View File

@ -33,7 +33,7 @@ import java.util.Set;
public interface ISearchBuilder {
Iterator<Long> createQuery(SearchParameterMap theParams, String theSearchUuid);
IResultIterator createQuery(SearchParameterMap theParams, String theSearchUuid);
void setMaxResultsToFetch(Integer theMaxResultsToFetch);
@ -53,4 +53,5 @@ public interface ISearchBuilder {
void setType(Class<? extends IBaseResource> theResourceType, String theResourceName);
void setPreviouslyAddedResourcePids(List<Long> thePreviouslyAddedResourcePids);
}

View File

@ -1293,7 +1293,7 @@ public class SearchBuilder implements ISearchBuilder {
}
@Override
public Iterator<Long> createQuery(SearchParameterMap theParams, String theSearchUuid) {
public IResultIterator createQuery(SearchParameterMap theParams, String theSearchUuid) {
myParams = theParams;
myBuilder = myEntityManager.getCriteriaBuilder();
mySearchUuid = theSearchUuid;
@ -2142,7 +2142,7 @@ public class SearchBuilder implements ISearchBuilder {
}
private final class QueryIterator extends BaseIterator<Long> implements Iterator<Long> {
private final class QueryIterator extends BaseIterator<Long> implements IResultIterator {
private boolean myFirst = true;
private IncludesIterator myIncludesIterator;
@ -2170,11 +2170,10 @@ public class SearchBuilder implements ISearchBuilder {
// If we don't have a query yet, create one
if (myResultsIterator == null) {
Integer maxResultsToFetch = myMaxResultsToFetch;
if (maxResultsToFetch == null) {
maxResultsToFetch = myCallingDao.getConfig().getFetchSizeDefaultMaximum();
if (myMaxResultsToFetch == null) {
myMaxResultsToFetch = myCallingDao.getConfig().getFetchSizeDefaultMaximum();
}
final TypedQuery<Long> query = createQuery(mySort, maxResultsToFetch, false);
final TypedQuery<Long> query = createQuery(mySort, myMaxResultsToFetch, false);
Query<Long> hibernateQuery = (Query<Long>) query;
hibernateQuery.setFetchSize(myFetchSize);
@ -2261,7 +2260,7 @@ public class SearchBuilder implements ISearchBuilder {
}
}
private class UniqueIndexIterator implements Iterator<Long> {
private class UniqueIndexIterator implements IResultIterator {
private final Set<String> myUniqueQueryStrings;
private Iterator<Long> myWrap = null;

View File

@ -31,9 +31,9 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -291,6 +291,17 @@ public class SearchParameterMap extends LinkedHashMap<String, List<List<? extend
return this;
}
/**
* This method creates a URL query string representation of the parameters in this
* object, excluding the part before the parameters, e.g.
* <p>
* <code>?name=smith&_sort=Patient:family</code>
* </p>
* <p>
* This method <b>excludes</b> the <code>_count</code> parameter,
* as it doesn't affect the substance of the results returned
* </p>
*/
public String toNormalizedQueryString(FhirContext theCtx) {
StringBuilder b = new StringBuilder();

View File

@ -52,4 +52,8 @@ public interface ISearchDao extends JpaRepository<Search, Long> {
@Modifying
@Query("UPDATE Search s SET s.myDeleted = :deleted WHERE s.myId = :pid")
void updateDeleted(@Param("pid") Long thePid, @Param("deleted") boolean theDeleted);
@Modifying
@Query("DELETE FROM Search s WHERE s.myId = :pid")
void deleteByPid(@Param("pid") Long theId);
}

View File

@ -6,6 +6,7 @@ import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
@ -43,4 +44,8 @@ public interface ISearchResultDao extends JpaRepository<SearchResult, Long> {
@Query(value="SELECT r.myId FROM SearchResult r WHERE r.mySearchPid = :search")
Slice<Long> findForSearch(Pageable thePage, @Param("search") Long theSearchPid);
@Modifying
@Query("DELETE FROM SearchResult s WHERE s.myId IN :ids")
void deleteByIds(@Param("ids") List<Long> theContent);
}

View File

@ -24,9 +24,9 @@ import static org.apache.commons.lang3.StringUtils.left;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -52,6 +52,7 @@ public class Search implements Serializable {
@Temporal(TemporalType.TIMESTAMP)
@Column(name = "CREATED", nullable = false, updatable = false)
private Date myCreated;
@OptimisticLock(excluded = true)
@Column(name = "SEARCH_DELETED", nullable = true)
private Boolean myDeleted;
@Column(name = "FAILURE_CODE", nullable = true)
@ -79,7 +80,7 @@ public class Search implements Serializable {
private Long myResourceId;
@Column(name = "RESOURCE_TYPE", length = 200, nullable = true)
private String myResourceType;
@OneToMany(mappedBy = "mySearch")
@OneToMany(mappedBy = "mySearch", fetch = FetchType.LAZY)
private Collection<SearchResult> myResults;
@NotNull
@Temporal(TemporalType.TIMESTAMP)
@ -254,14 +255,6 @@ public class Search implements Serializable {
myStatus = theStatus;
}
/** FIXME: remove */
private static final Logger ourLog = LoggerFactory.getLogger(Search.class);
/** FIXME: remove */
@PrePersist
public void preSave() {
ourLog.info("** PREPERSIST - Version is {}", myVersion);
}
public Integer getTotalCount() {
return myTotalCount;
}

View File

@ -24,10 +24,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.IDao;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.entity.BaseHasResource;
import ca.uhn.fhir.jpa.entity.ResourceHistoryTable;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
import ca.uhn.fhir.jpa.entity.*;
import ca.uhn.fhir.model.primitive.InstantDt;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import org.hl7.fhir.instance.model.api.IBaseResource;

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.search;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -31,6 +31,7 @@ import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.SummaryEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.IPagingProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
@ -54,7 +55,6 @@ import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
@ -90,6 +90,8 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
private PlatformTransactionManager myManagedTxManager;
@Autowired
private IFhirSystemDao<?, ?> mySystemDao;
@Autowired
private IPagingProvider myPagingProvider;
private int mySyncSize = DEFAULT_SYNC_SIZE;
@ -157,6 +159,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) {
ourLog.error("Search {} of type {} for {}{} timed out after {}ms", search.getId(), search.getSearchType(), search.getResourceType(), search.getSearchQueryString(), sw.getMillis());
throw new InternalErrorException("Request timed out after " + sw.getMillis() + "ms");
}
@ -263,42 +266,39 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
// Execute the query and make sure we return distinct results
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
return txTemplate.execute(new TransactionCallback<SimpleBundleProvider>() {
@Override
public SimpleBundleProvider doInTransaction(TransactionStatus theStatus) {
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
return txTemplate.execute(t -> {
// Load the results synchronously
final List<Long> pids = new ArrayList<>();
// Load the results synchronously
final List<Long> pids = new ArrayList<>();
Iterator<Long> resultIter = sb.createQuery(theParams, searchUuid);
while (resultIter.hasNext()) {
pids.add(resultIter.next());
if (loadSynchronousUpTo != null && pids.size() >= loadSynchronousUpTo) {
break;
}
if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
break;
}
Iterator<Long> resultIter = sb.createQuery(theParams, searchUuid);
while (resultIter.hasNext()) {
pids.add(resultIter.next());
if (loadSynchronousUpTo != null && pids.size() >= loadSynchronousUpTo) {
break;
}
if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
break;
}
/*
* For synchronous queries, we load all the includes right away
* since we're returning a static bundle with all the results
* pre-loaded. This is ok because syncronous requests are not
* expected to be paged
*
* On the other hand for async queries we load includes/revincludes
* individually for pages as we return them to clients
*/
final Set<Long> includedPids = new HashSet<Long>();
includedPids.addAll(sb.loadIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated()));
includedPids.addAll(sb.loadIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated()));
List<IBaseResource> resources = new ArrayList<IBaseResource>();
sb.loadResourcesByPid(pids, resources, includedPids, false, myEntityManager, myContext, theCallingDao);
return new SimpleBundleProvider(resources);
}
/*
* For synchronous queries, we load all the includes right away
* since we're returning a static bundle with all the results
* pre-loaded. This is ok because syncronous requests are not
* expected to be paged
*
* On the other hand for async queries we load includes/revincludes
* individually for pages as we return them to clients
*/
final Set<Long> includedPids = new HashSet<>();
includedPids.addAll(sb.loadIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated()));
includedPids.addAll(sb.loadIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated()));
List<IBaseResource> resources = new ArrayList<>();
sb.loadResourcesByPid(pids, resources, includedPids, false, myEntityManager, myContext, theCallingDao);
return new SimpleBundleProvider(resources);
});
}
@ -318,33 +318,30 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
final String resourceType = theResourceType;
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
PersistedJpaBundleProvider foundSearchProvider = txTemplate.execute(new TransactionCallback<PersistedJpaBundleProvider>() {
@Override
public PersistedJpaBundleProvider doInTransaction(TransactionStatus theStatus) {
Search searchToUse = null;
PersistedJpaBundleProvider foundSearchProvider = txTemplate.execute(t -> {
Search searchToUse = null;
int hashCode = queryString.hashCode();
Collection<Search> candidates = mySearchDao.find(resourceType, hashCode, createdCutoff);
for (Search nextCandidateSearch : candidates) {
if (queryString.equals(nextCandidateSearch.getSearchQueryString())) {
searchToUse = nextCandidateSearch;
}
int hashCode = queryString.hashCode();
Collection<Search> candidates = mySearchDao.find(resourceType, hashCode, createdCutoff);
for (Search nextCandidateSearch : candidates) {
if (queryString.equals(nextCandidateSearch.getSearchQueryString())) {
searchToUse = nextCandidateSearch;
}
PersistedJpaBundleProvider retVal = null;
if (searchToUse != null) {
ourLog.info("Reusing search {} from cache", searchToUse.getUuid());
searchToUse.setSearchLastReturned(new Date());
mySearchDao.updateSearchLastReturned(searchToUse.getId(), new Date());
retVal = new PersistedJpaBundleProvider(searchToUse.getUuid(), theCallingDao);
retVal.setCacheHit(true);
populateBundleProvider(retVal);
}
return retVal;
}
PersistedJpaBundleProvider retVal = null;
if (searchToUse != null) {
ourLog.info("Reusing search {} from cache", searchToUse.getUuid());
searchToUse.setSearchLastReturned(new Date());
mySearchDao.updateSearchLastReturned(searchToUse.getId(), new Date());
retVal = new PersistedJpaBundleProvider(searchToUse.getUuid(), theCallingDao);
retVal.setCacheHit(true);
populateBundleProvider(retVal);
}
return retVal;
});
if (foundSearchProvider != null) {
@ -425,25 +422,35 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
public abstract class BaseTask implements Callable<Void> {
// FIXME: don't make this protected
protected final Search mySearch;
protected final SearchParameterMap myParams;
protected final IDao myCallingDao;
protected final String myResourceType;
protected final ArrayList<Long> mySyncedPids = new ArrayList<>();
protected final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
protected Search getSearch() {
return getSearch;
}
private final Search getSearch;
private final SearchParameterMap myParams;
private final IDao myCallingDao;
private final String myResourceType;
private final ArrayList<Long> mySyncedPids = new ArrayList<>();
protected CountDownLatch getInitialCollectionLatch() {
return myInitialCollectionLatch;
}
private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
private final CountDownLatch myCompletionLatch;
private final ArrayList<Long> myUnsyncedPids = new ArrayList<>();
private boolean myAbortRequested;
private int myCountSaved = 0;
private boolean myAdditionalPrefetchThresholdsRemaining;
private int myTotalNumberSynced;
private List<Long> myPreviouslyAddedResourcePids;
private Integer myMaxResultsToFetch;
private int myCountFetchedDuringThisPass;
/**
* Constructor
*/
protected BaseTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
mySearch = theSearch;
getSearch = theSearch;
myCallingDao = theCallingDao;
myParams = theParams;
myResourceType = theResourceType;
@ -460,28 +467,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
ISearchBuilder sb = myCallingDao.newSearchBuilder();
sb.setType(resourceTypeClass, myResourceType);
// How many results are we pre-loading right now
int currentlyLoaded = defaultIfNull(mySearch.getTotalCount(), 0);
for (Iterator<Integer> iter = myDaoConfig.getPreFetchThresholds().iterator(); iter.hasNext(); ) {
int next = iter.next();
if (next != -1 && next <= currentlyLoaded) {
continue;
}
if (next == -1) {
sb.setMaxResultsToFetch(null);
} else {
sb.setMaxResultsToFetch(next);
}
if (iter.hasNext()) {
myAdditionalPrefetchThresholdsRemaining = true;
}
// If we get here's we've found an appropriate threshold
break;
}
return sb;
}
@ -491,8 +476,8 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
boolean keepWaiting;
do {
synchronized (mySyncedPids) {
ourLog.trace("Search status is {}", mySearch.getStatus());
keepWaiting = mySyncedPids.size() < theToIndex && mySearch.getStatus() == SearchStatusEnum.LOADING;
ourLog.trace("Search status is {}", getSearch.getStatus());
keepWaiting = mySyncedPids.size() < theToIndex && getSearch.getStatus() == SearchStatusEnum.LOADING;
}
if (keepWaiting) {
ourLog.info("Waiting, as we only have {} results", mySyncedPids.size());
@ -508,7 +493,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
ArrayList<Long> retVal = new ArrayList<>();
synchronized (mySyncedPids) {
verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
verifySearchHasntFailedOrThrowInternalErrorException(getSearch);
int toIndex = theToIndex;
if (mySyncedPids.size() < toIndex) {
@ -526,7 +511,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
protected void saveSearch() {
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
@ -538,17 +523,17 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
private void saveUnsynced(final Iterator<Long> theResultIter) {
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
if (mySearch.getId() == null) {
if (getSearch.getId() == null) {
doSaveSearch();
}
List<SearchResult> resultsToSave = Lists.newArrayList();
for (Long nextPid : myUnsyncedPids) {
SearchResult nextResult = new SearchResult(mySearch);
SearchResult nextResult = new SearchResult(getSearch);
nextResult.setResourcePid(nextPid);
nextResult.setOrder(myCountSaved++);
resultsToSave.add(nextResult);
@ -558,24 +543,27 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
synchronized (mySyncedPids) {
int numSyncedThisPass = myUnsyncedPids.size();
myTotalNumberSynced += numSyncedThisPass;
ourLog.trace("Syncing {} search results", numSyncedThisPass);
mySyncedPids.addAll(myUnsyncedPids);
myUnsyncedPids.clear();
if (theResultIter.hasNext() == false) {
mySearch.setTotalCount(myCountSaved);
if (myAdditionalPrefetchThresholdsRemaining) {
getSearch.setNumFound(myCountSaved);
if (myMaxResultsToFetch != null && myCountSaved < myMaxResultsToFetch) {
getSearch.setStatus(SearchStatusEnum.FINISHED);
getSearch.setTotalCount(myCountSaved);
} else if (myAdditionalPrefetchThresholdsRemaining) {
ourLog.trace("Setting search status to PASSCMPLET");
mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
mySearch.setSearchParameterMap(myParams);
getSearch.setStatus(SearchStatusEnum.PASSCMPLET);
getSearch.setSearchParameterMap(myParams);
} else {
mySearch.setStatus(SearchStatusEnum.FINISHED);
getSearch.setStatus(SearchStatusEnum.FINISHED);
getSearch.setTotalCount(myCountSaved);
}
}
}
mySearch.setNumFound(myCountSaved);
getSearch.setNumFound(myCountSaved);
int numSynced;
synchronized (mySyncedPids) {
@ -627,7 +615,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
saveSearch();
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theStatus) {
@ -635,7 +623,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
});
ourLog.info("Completed search for {} resources in {}ms", mySyncedPids.size(), sw.getMillis());
ourLog.info("Completed search for [{}] and found {} resources in {}ms", getSearch.getSearchQueryString(), mySyncedPids.size(), sw.getMillis());
} catch (Throwable t) {
@ -668,15 +656,15 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
failureCode = ((BaseServerResponseException) t).getStatusCode();
}
mySearch.setFailureMessage(failureMessage);
mySearch.setFailureCode(failureCode);
mySearch.setStatus(SearchStatusEnum.FAILED);
getSearch.setFailureMessage(failureMessage);
getSearch.setFailureCode(failureCode);
getSearch.setStatus(SearchStatusEnum.FAILED);
saveSearch();
} finally {
myIdToSearchTask.remove(mySearch.getUuid());
myIdToSearchTask.remove(getSearch.getUuid());
myInitialCollectionLatch.countDown();
markComplete();
@ -685,33 +673,44 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
private void doSaveSearch() {
if (mySearch.getId() == null) {
mySearchDao.save(mySearch);
for (SearchInclude next : mySearch.getIncludes()) {
if (getSearch.getId() == null) {
mySearchDao.save(getSearch);
for (SearchInclude next : getSearch.getIncludes()) {
mySearchIncludeDao.save(next);
}
} else {
mySearchDao.save(mySearch);
mySearchDao.save(getSearch);
}
}
/**
* This method actually creates the database query to perform the
* search, and starts it.
*/
private void doSearch() {
/*
* If the user has explicitly requested a _count, perform a
*
* SELECT COUNT(*) ....
*
* before doing anything else.
*/
boolean wantCount = myParams.getSummaryMode().contains(SummaryEnum.COUNT);
boolean wantOnlyCount = wantCount && myParams.getSummaryMode().size() == 1;
if (wantCount) {
ISearchBuilder sb = newSearchBuilder();
Iterator<Long> countIterator = sb.createCountQuery(myParams, mySearch.getUuid());
Iterator<Long> countIterator = sb.createCountQuery(myParams, getSearch.getUuid());
Long count = countIterator.next();
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
mySearch.setTotalCount(count.intValue());
getSearch.setTotalCount(count.intValue());
if (wantOnlyCount) {
mySearch.setStatus(SearchStatusEnum.FINISHED);
getSearch.setStatus(SearchStatusEnum.FINISHED);
}
doSaveSearch();
}
@ -722,17 +721,71 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
ISearchBuilder sb = newSearchBuilder();
/*
* Figure out how many results we're actually going to fetch from the
* database in this pass. This calculation takes into consideration the
* "pre-fetch thresholds" specified in DaoConfig#getPreFetchThresholds()
* as well as the value of the _count parameter.
*/
int currentlyLoaded = defaultIfNull(getSearch.getNumFound(), 0);
int minWanted = 0;
if (myParams.getCount() != null) {
minWanted = myParams.getCount();
minWanted = Math.max(minWanted, myPagingProvider.getMaximumPageSize());
minWanted += currentlyLoaded;
}
for (Iterator<Integer> iter = myDaoConfig.getPreFetchThresholds().iterator(); iter.hasNext(); ) {
int next = iter.next();
if (next != -1 && next <= currentlyLoaded) {
continue;
}
if (next == -1) {
sb.setMaxResultsToFetch(null);
} else {
myMaxResultsToFetch = Math.max(next, minWanted);
sb.setMaxResultsToFetch(myMaxResultsToFetch);
}
if (iter.hasNext()) {
myAdditionalPrefetchThresholdsRemaining = true;
}
// If we get here's we've found an appropriate threshold
break;
}
/*
* Provide any PID we loaded in previous seasrch passes to the
* SearchBuilder so that we don't get duplicates coming from running
* the same query again.
*
* We could possibly accomplish this in a different way by using sorted
* results in our SQL query and specifying an offset. I don't actually
* know if that would be faster or not. At some point should test this
* idea.
*/
if (myPreviouslyAddedResourcePids != null) {
sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids);
mySyncedPids.addAll(myPreviouslyAddedResourcePids);
}
Iterator<Long> theResultIterator = sb.createQuery(myParams, mySearch.getUuid());
/*
* Construct the SQL query we'll be sending to the database
*/
Iterator<Long> theResultIterator = sb.createQuery(myParams, getSearch.getUuid());
/*
* The following loop actually loads the PIDs of the resources
* matching the search off of the disk and into memory. After
* every X results, we commit to the HFJ_SEARCH table.
*/
int syncSize = mySyncSize;
while (theResultIterator.hasNext()) {
myUnsyncedPids.add(theResultIterator.next());
myCountFetchedDuringThisPass++;
boolean shouldSync = myUnsyncedPids.size() >= syncSize;
@ -783,14 +836,14 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.afterPropertiesSet();
txTemplate.execute(t -> {
List<Long> previouslyAddedResourcePids = mySearchResultDao.findWithSearchUuid(mySearch);
List<Long> previouslyAddedResourcePids = mySearchResultDao.findWithSearchUuid(getSearch());
setPreviouslyAddedResourcePids(previouslyAddedResourcePids);
return null;
});
} catch (Throwable e) {
ourLog.error("Failure processing search", e);
mySearch.setFailureMessage(e.toString());
mySearch.setStatus(SearchStatusEnum.FAILED);
getSearch().setFailureMessage(e.toString());
getSearch().setStatus(SearchStatusEnum.FAILED);
saveSearch();
return null;
@ -836,17 +889,17 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
ourLog.trace("Awaiting initial sync");
do {
try {
if (myInitialCollectionLatch.await(250, TimeUnit.MILLISECONDS)) {
if (getInitialCollectionLatch().await(250, TimeUnit.MILLISECONDS)) {
break;
}
} catch (InterruptedException e) {
// Shouldn't happen
throw new InternalErrorException(e);
}
} while (mySearch.getStatus() == SearchStatusEnum.LOADING);
} while (getSearch().getStatus() == SearchStatusEnum.LOADING);
ourLog.trace("Initial sync completed");
return mySearch.getTotalCount();
return getSearch().getTotalCount();
}
}

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.search;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -24,23 +24,20 @@ import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
import ca.uhn.fhir.jpa.entity.Search;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.dstu3.model.InstantType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Slice;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.util.Date;
/**
@ -49,6 +46,12 @@ import java.util.Date;
public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
public static final long DEFAULT_CUTOFF_SLACK = 10 * DateUtils.MILLIS_PER_SECOND;
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(StaleSearchDeletingSvcImpl.class);
/*
* Be careful increasing this number! We use the number of params here in a
* DELETE FROM foo WHERE params IN (aaaa)
* type query and this can fail if we have 1000s of params
*/
public static int ourMaximumResultsToDelete = 500;
private static Long ourNowForUnitTests;
/*
* We give a bit of extra leeway just to avoid race conditions where a query result
@ -66,6 +69,8 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
private ISearchResultDao mySearchResultDao;
@Autowired
private PlatformTransactionManager myTransactionManager;
@PersistenceContext()
private EntityManager myEntityManager;
private void deleteSearch(final Long theSearchPid) {
mySearchDao.findById(theSearchPid).ifPresent(searchToDelete -> {
@ -73,22 +78,22 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
mySearchIncludeDao.deleteForSearch(searchToDelete.getId());
/*
* Note, we're only deleting up to 1000 results in an individual search here. This
* Note, we're only deleting up to 500 results in an individual search here. This
* is to prevent really long running transactions in cases where there are
* huge searches with tons of results in them. By the time we've gotten here
* we have marked the parent Search entity as deleted, so it's not such a
* huge deal to be only partially deleting search results. They'll get deleted
* eventually
*/
int max = 10000;
int max = ourMaximumResultsToDelete;
Slice<Long> resultPids = mySearchResultDao.findForSearch(PageRequest.of(0, max), searchToDelete.getId());
for (Long next : resultPids) {
mySearchResultDao.deleteById(next);
if (resultPids.hasContent()) {
mySearchResultDao.deleteByIds(resultPids.getContent());
}
// Only delete if we don't have results left in this search
if (resultPids.getNumberOfElements() < max) {
mySearchDao.delete(searchToDelete);
mySearchDao.deleteByPid(searchToDelete.getId());
}
});
}
@ -114,15 +119,16 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
TransactionTemplate tt = new TransactionTemplate(myTransactionManager);
final Slice<Long> toDelete = tt.execute(theStatus ->
mySearchDao.findWhereLastReturnedBefore(cutoff, new PageRequest(0, 1000))
mySearchDao.findWhereLastReturnedBefore(cutoff, PageRequest.of(0, 1000))
);
for (final Long nextSearchToDelete : toDelete) {
ourLog.debug("Deleting search with PID {}", nextSearchToDelete);
tt.execute(t->{
tt.execute(t -> {
mySearchDao.updateDeleted(nextSearchToDelete, true);
return null;
});
tt.execute(t->{
tt.execute(t -> {
deleteSearch(nextSearchToDelete);
return null;
});
@ -130,12 +136,7 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
int count = toDelete.getContent().size();
if (count > 0) {
long total = tt.execute(new TransactionCallback<Long>() {
@Override
public Long doInTransaction(TransactionStatus theStatus) {
return mySearchDao.count();
}
});
long total = tt.execute(t -> mySearchDao.count());
ourLog.info("Deleted {} searches, {} remaining", count, total);
}
@ -155,6 +156,11 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
myCutoffSlack = theCutoffSlack;
}
@VisibleForTesting
public static void setMaximumResultsToDeleteForUnitTest(int theMaximumResultsToDelete) {
ourMaximumResultsToDelete = theMaximumResultsToDelete;
}
private static long now() {
if (ourNowForUnitTests != null) {
return ourNowForUnitTests;

View File

@ -105,7 +105,7 @@ public class TestR4Config extends BaseJavaConfigR4 {
DataSource dataSource = ProxyDataSourceBuilder
.create(retVal)
// .logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL")
.logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL")
.logSlowQueryBySlf4j(10, TimeUnit.SECONDS)
.countQuery(new ThreadQueryCountHolder())
.build();

View File

@ -16,6 +16,7 @@ import ca.uhn.fhir.model.dstu2.resource.Bundle.Entry;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.IRequestOperationCallback;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.util.BundleUtil;
@ -54,17 +55,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static ca.uhn.fhir.util.TestUtil.randomizeLocale;
import static org.junit.Assert.*;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public abstract class BaseJpaTest {
static {
System.setProperty(Constants.TEST_SYSTEM_PROP_VALIDATION_RESOURCE_CACHES_MS, "1000");
}
protected static final String CM_URL = "http://example.com/my_concept_map";
protected static final String CS_URL = "http://example.com/my_code_system";
protected static final String CS_URL_2 = "http://example.com/my_code_system2";
@ -73,11 +71,18 @@ public abstract class BaseJpaTest {
protected static final String VS_URL = "http://example.com/my_value_set";
protected static final String VS_URL_2 = "http://example.com/my_value_set2";
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseJpaTest.class);
static {
System.setProperty(Constants.TEST_SYSTEM_PROP_VALIDATION_RESOURCE_CACHES_MS, "1000");
}
@Rule
public LoggingRule myLoggingRule = new LoggingRule();
protected ServletRequestDetails mySrd;
protected ArrayList<IServerInterceptor> myServerInterceptorList;
protected IRequestOperationCallback myRequestOperationCallback = mock(IRequestOperationCallback.class);
@Autowired
protected DatabaseBackedPagingProvider myDatabaseBackedPagingProvider;
@After
public void afterPerformCleanup() {
@ -146,6 +151,16 @@ public abstract class BaseJpaTest {
});
}
public <T> T runInTransaction(Callable<T> theRunnable) {
return newTxTemplate().execute(t->{
try {
return theRunnable.call();
} catch (Exception theE) {
throw new InternalErrorException(theE);
}
});
}
/**
* Sleep until at least 1 ms has elapsed
*/
@ -218,10 +233,6 @@ public abstract class BaseJpaTest {
return toUnqualifiedVersionlessIdValues(theFound, fromIndex, toIndex, true);
}
@Autowired
private DatabaseBackedPagingProvider myDatabaseBackedPagingProvider;
protected List<String> toUnqualifiedVersionlessIdValues(IBundleProvider theFound, int theFromIndex, Integer theToIndex, boolean theFirstCall) {
if (theToIndex == null) {
theToIndex = 99999;
@ -353,7 +364,7 @@ public abstract class BaseJpaTest {
boolean expungeEnabled = theDaoConfig.isExpungeEnabled();
theDaoConfig.setExpungeEnabled(true);
for (int count = 0;; count++) {
for (int count = 0; ; count++) {
try {
theSystemDao.expunge(new ExpungeOptions().setExpungeEverything(true));
break;
@ -408,7 +419,7 @@ public abstract class BaseJpaTest {
return "null";
}
if (t instanceof IBaseResource) {
return ((IBaseResource)t).getIdElement().getValue();
return ((IBaseResource) t).getIdElement().getValue();
}
return t.toString();
})

View File

@ -210,6 +210,8 @@ public abstract class BaseJpaR4Test extends BaseJpaTest {
@Autowired
protected ISearchResultDao mySearchResultDao;
@Autowired
protected ISearchIncludeDao mySearchIncludeDao;
@Autowired
@Qualifier("mySearchParameterDaoR4")
protected IFhirResourceDao<SearchParameter> mySearchParameterDao;
@Autowired

View File

@ -4,8 +4,11 @@ import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchStatusEnum;
import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.SummaryEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.util.TestUtil;
import com.google.common.collect.Sets;
import org.hl7.fhir.r4.model.Patient;
import org.junit.After;
import org.junit.AfterClass;
@ -20,7 +23,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import static org.apache.commons.lang3.StringUtils.leftPad;
import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@SuppressWarnings({"unchecked", "deprecation", "Duplicates"})
public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
@ -45,6 +51,90 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
}
@Test
public void testFetchCountOnly() {
myDaoConfig.setSearchPreFetchThresholds(Arrays.asList(20, 50, 190));
SearchParameterMap params = new SearchParameterMap();
params.setSort(new SortSpec(Patient.SP_NAME));
params.setSummaryMode(Sets.newHashSet(SummaryEnum.COUNT));
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertThat(ids, empty());
assertEquals(200, myDatabaseBackedPagingProvider.retrieveResultList(uuid).size().intValue());
}
@Test
public void testFetchCountAndData() {
myDaoConfig.setSearchPreFetchThresholds(Arrays.asList(20, 50, 190));
SearchParameterMap params = new SearchParameterMap();
params.setSort(new SortSpec(Patient.SP_NAME));
params.setSummaryMode(Sets.newHashSet(SummaryEnum.COUNT, SummaryEnum.DATA));
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00009", ids.get(9));
assertEquals(200, myDatabaseBackedPagingProvider.retrieveResultList(uuid).size().intValue());
}
@Test
public void testFetchRightUpToActualNumberExistingThenFetchAnotherPage() {
myDaoConfig.setSearchPreFetchThresholds(Arrays.asList(200, -1));
/*
* Load the first page of 200
*/
SearchParameterMap params = new SearchParameterMap();
params.setSort(new SortSpec(Patient.SP_NAME));
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 200, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00199", ids.get(199));
assertNull(myDatabaseBackedPagingProvider.retrieveResultList(uuid).size());
/*
* 20 should be prefetched since that's the initial page size
*/
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(200, search.getNumFound());
assertEquals(search.getNumFound(), mySearchResultDao.count());
assertNull(search.getTotalCount());
assertEquals(1, search.getVersion().intValue());
assertEquals(SearchStatusEnum.PASSCMPLET, search.getStatus());
});
/*
* Now load a page that crosses the next threshold
*/
ids = toUnqualifiedVersionlessIdValues(results, 200, 400, false);
assertThat(ids, empty());
/*
* Search gets incremented twice as a part of loading the next batch
*/
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(SearchStatusEnum.FINISHED, search.getStatus());
assertEquals(200, search.getNumFound());
assertEquals(search.getNumFound(), mySearchResultDao.count());
assertEquals(200, search.getTotalCount().intValue());
assertEquals(3, search.getVersion().intValue());
});
}
@Test
public void testFetchOnlySmallBatches() {
myDaoConfig.setSearchPreFetchThresholds(Arrays.asList(20, 50, 190));
@ -60,6 +150,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00009", ids.get(9));
assertNull(myDatabaseBackedPagingProvider.retrieveResultList(uuid).size());
/*
* 20 should be prefetched since that's the initial page size
@ -67,8 +158,9 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(20, search.getTotalCount().intValue());
assertEquals(search.getTotalCount().intValue(), mySearchResultDao.count());
assertEquals(20, search.getNumFound());
assertEquals(search.getNumFound(), mySearchResultDao.count());
assertNull(search.getTotalCount());
assertEquals(1, search.getVersion().intValue());
assertEquals(SearchStatusEnum.PASSCMPLET, search.getStatus());
});
@ -83,6 +175,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
ids = toUnqualifiedVersionlessIdValues(results, 10, 15, false);
assertEquals("Patient/PT00010", ids.get(0));
assertEquals("Patient/PT00014", ids.get(4));
assertNull(myDatabaseBackedPagingProvider.retrieveResultList(uuid).size());
/*
* Search should be untouched
@ -105,10 +198,11 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
*/
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(50, search.getTotalCount().intValue());
assertEquals(search.getTotalCount().intValue(), mySearchResultDao.count());
assertEquals(3, search.getVersion().intValue());
assertEquals(SearchStatusEnum.PASSCMPLET, search.getStatus());
assertEquals(50, search.getNumFound());
assertEquals(search.getNumFound(), mySearchResultDao.count());
assertNull(search.getTotalCount());
assertEquals(3, search.getVersion().intValue());
});
/*
@ -121,6 +215,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
ids = toUnqualifiedVersionlessIdValues(results, 25, 30, false);
assertEquals("Patient/PT00025", ids.get(0));
assertEquals("Patient/PT00029", ids.get(4));
assertNull(myDatabaseBackedPagingProvider.retrieveResultList(uuid).size());
/*
* Search should be untouched
@ -143,8 +238,9 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
*/
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(190, search.getNumFound());
assertEquals(search.getNumFound(), mySearchResultDao.count());
assertEquals(190, search.getTotalCount().intValue());
assertEquals(search.getTotalCount().intValue(), mySearchResultDao.count());
assertEquals(5, search.getVersion().intValue());
assertEquals(SearchStatusEnum.FINISHED, search.getStatus());
});
@ -157,12 +253,103 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
assertEquals(10, ids.size());
assertEquals("Patient/PT00180", ids.get(0));
assertEquals("Patient/PT00189", ids.get(9));
assertEquals(190, myDatabaseBackedPagingProvider.retrieveResultList(uuid).size().intValue());
}
@Test
public void testFetchMoreThanFirstPageSizeInFirstPage() {
myDaoConfig.setSearchPreFetchThresholds(Arrays.asList(20, -1));
/*
* Load a page that exceeds the initial page siz
*/
SearchParameterMap params = new SearchParameterMap();
params.setSort(new SortSpec(Patient.SP_NAME));
params.setCount(50);
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 50, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00049", ids.get(49));
assertNull(myDatabaseBackedPagingProvider.retrieveResultList(uuid).size());
/*
* 20 should be prefetched since that's the initial page size
*/
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(50, search.getNumFound());
assertEquals(search.getNumFound(), mySearchResultDao.count());
assertEquals(null, search.getTotalCount());
assertEquals(SearchStatusEnum.PASSCMPLET, search.getStatus());
assertEquals(1, search.getVersion().intValue());
});
}
@Test
public void testFetchUnlimited() {
myDaoConfig.setSearchPreFetchThresholds(Arrays.asList(20, -1));
/*
* Load the first page of 10
*/
SearchParameterMap params = new SearchParameterMap();
params.setSort(new SortSpec(Patient.SP_NAME));
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00009", ids.get(9));
assertNull(myDatabaseBackedPagingProvider.retrieveResultList(uuid).size());
/*
* 20 should be prefetched since that's the initial page size
*/
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(20, search.getNumFound());
assertEquals(search.getNumFound(), mySearchResultDao.count());
assertNull(search.getTotalCount());
assertEquals(1, search.getVersion().intValue());
assertEquals(SearchStatusEnum.PASSCMPLET, search.getStatus());
});
/*
* Load a few more that shouldn't require a new page fetch
*/
params = new SearchParameterMap();
params.setSort(new SortSpec(Patient.SP_NAME));
results = myPatientDao.search(params);
ids = toUnqualifiedVersionlessIdValues(results, 15, 25, false);
assertEquals("Patient/PT00015", ids.get(0));
assertEquals("Patient/PT00024", ids.get(9));
assertEquals(200, myDatabaseBackedPagingProvider.retrieveResultList(uuid).size().intValue());
/*
* Search should be untouched
*/
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(200, search.getNumFound());
assertEquals(search.getNumFound(), mySearchResultDao.count());
assertEquals(200, search.getTotalCount().intValue());
assertEquals(3, search.getVersion().intValue());
assertEquals(SearchStatusEnum.FINISHED, search.getStatus());
});
}
@Test
public void testFetchSecondBatchInManyThreads() throws Throwable {
myDaoConfig.setSearchPreFetchThresholds(Arrays.asList(20, -1));
@ -178,6 +365,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00009", ids.get(9));
assertNull(results.size());
/*
* 20 should be prefetched since that's the initial page size
@ -185,8 +373,9 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(20, search.getTotalCount().intValue());
assertEquals(search.getTotalCount().intValue(), mySearchResultDao.count());
assertEquals(20, search.getNumFound());
assertEquals(search.getNumFound(), mySearchResultDao.count());
assertNull(search.getTotalCount());
assertEquals(1, search.getVersion().intValue());
assertEquals(SearchStatusEnum.PASSCMPLET, search.getStatus());
});
@ -220,14 +409,42 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
for (Future<Throwable> next : futures) {
Throwable t = next.get();
if (t!=null) {
if (t != null) {
throw t;
}
}
executor.shutdownNow();
}
@AfterClass
@Test
public void testSearchThatOnlyReturnsASmallResult() {
myDaoConfig.setSearchPreFetchThresholds(Arrays.asList(20, 50, 190));
SearchParameterMap params = new SearchParameterMap();
params.setSort(new SortSpec(Patient.SP_NAME));
params.add(Patient.SP_RES_ID, new TokenParam("PT00000"));
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals(1, ids.size());
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(SearchStatusEnum.FINISHED, search.getStatus());
assertEquals(1, search.getNumFound());
assertEquals(search.getNumFound(), mySearchResultDao.count());
assertEquals(1, search.getTotalCount().intValue());
assertEquals(1, search.getVersion().intValue());
});
assertEquals(1, myDatabaseBackedPagingProvider.retrieveResultList(uuid).size().intValue());
}
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}

View File

@ -1,12 +1,12 @@
package ca.uhn.fhir.jpa.provider.r4;
import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import ca.uhn.fhir.jpa.entity.*;
import ca.uhn.fhir.jpa.search.StaleSearchDeletingSvcImpl;
import ca.uhn.fhir.rest.gclient.IClientExecutable;
import ca.uhn.fhir.rest.gclient.IQuery;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.util.TestUtil;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Bundle.BundleLinkComponent;
import org.hl7.fhir.r4.model.Patient;
@ -16,29 +16,26 @@ import org.junit.Before;
import org.junit.Test;
import org.springframework.test.util.AopTestUtils;
import ca.uhn.fhir.jpa.search.StaleSearchDeletingSvcImpl;
import ca.uhn.fhir.rest.gclient.IClientExecutable;
import ca.uhn.fhir.rest.gclient.IQuery;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.util.TestUtil;
import java.util.Date;
import java.util.UUID;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(StaleSearchDeletingSvcR4Test.class);
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
@Override
@After()
public void after() throws Exception {
super.after();
StaleSearchDeletingSvcImpl staleSearchDeletingSvc = AopTestUtils.getTargetObject(myStaleSearchDeletingSvc);
staleSearchDeletingSvc.setCutoffSlackForUnitTest(StaleSearchDeletingSvcImpl.DEFAULT_CUTOFF_SLACK);
StaleSearchDeletingSvcImpl.setMaximumResultsToDeleteForUnitTest(10000);
}
@Override
@Before
public void before() throws Exception {
super.before();
@ -94,4 +91,75 @@ public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
}
}
@Test
public void testDeleteVeryLargeSearch() {
StaleSearchDeletingSvcImpl.setMaximumResultsToDeleteForUnitTest(10);
runInTransaction(() -> {
Search search = new Search();
search.setStatus(SearchStatusEnum.FINISHED);
search.setUuid(UUID.randomUUID().toString());
search.setCreated(DateUtils.addDays(new Date(), -10000));
search.setSearchType(SearchTypeEnum.SEARCH);
search.setResourceType("Patient");
search.setSearchLastReturned(DateUtils.addDays(new Date(), -10000));
search = mySearchEntityDao.save(search);
for (int i = 0; i < 15; i++) {
ResourceTable resource = new ResourceTable();
resource.setPublished(new Date());
resource.setUpdated(new Date());
resource.setResourceType("Patient");
resource = myResourceTableDao.saveAndFlush(resource);
SearchResult sr = new SearchResult(search);
sr.setOrder(i);
sr.setResourcePid(resource.getId());
mySearchResultDao.save(sr);
}
SearchInclude si = new SearchInclude(search, "Patient:name", false, false);
mySearchIncludeDao.save(si);
});
// It should take two passes to delete the search fully
assertEquals(1, mySearchEntityDao.count());
myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem();
assertEquals(1, mySearchEntityDao.count());
myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem();
assertEquals(0, mySearchEntityDao.count());
}
@Test
public void testDeleteVerySmallSearch() {
StaleSearchDeletingSvcImpl.setMaximumResultsToDeleteForUnitTest(10);
runInTransaction(() -> {
Search search = new Search();
search.setStatus(SearchStatusEnum.FINISHED);
search.setUuid(UUID.randomUUID().toString());
search.setCreated(DateUtils.addDays(new Date(), -10000));
search.setSearchType(SearchTypeEnum.SEARCH);
search.setResourceType("Patient");
search.setSearchLastReturned(DateUtils.addDays(new Date(), -10000));
search = mySearchEntityDao.save(search);
});
// It should take one pass to delete the search fully
assertEquals(1, mySearchEntityDao.count());
myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem();
assertEquals(0, mySearchEntityDao.count());
}
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
}

View File

@ -1,10 +1,7 @@
package ca.uhn.fhir.jpa.search;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.IDao;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.dao.*;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
@ -140,7 +137,7 @@ public class SearchCoordinatorSvcImplTest {
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 800);
Iterator<Long> iter = new FailAfterNIterator<Long>(new SlowIterator<Long>(pids.iterator(), 2), 300);
IResultIterator iter = new FailAfterNIterator(new SlowIterator(pids.iterator(), 2), 300);
when(mySearchBuider.createQuery(Mockito.same(params), any(String.class))).thenReturn(iter);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective());
@ -161,7 +158,7 @@ public class SearchCoordinatorSvcImplTest {
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 800);
Iterator<Long> iter = new SlowIterator<Long>(pids.iterator(), 1);
IResultIterator iter = new SlowIterator(pids.iterator(), 1);
when(mySearchBuider.createQuery(Mockito.same(params), any(String.class))).thenReturn(iter);
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(any(List.class), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
@ -197,7 +194,7 @@ public class SearchCoordinatorSvcImplTest {
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 800);
SlowIterator<Long> iter = new SlowIterator<Long>(pids.iterator(), 2);
SlowIterator iter = new SlowIterator(pids.iterator(), 2);
when(mySearchBuider.createQuery(Mockito.same(params), any(String.class))).thenReturn(iter);
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(any(List.class), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
@ -225,7 +222,7 @@ public class SearchCoordinatorSvcImplTest {
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 800);
Iterator<Long> iter = new SlowIterator<Long>(pids.iterator(), 2);
IResultIterator iter = new SlowIterator(pids.iterator(), 2);
when(mySearchBuider.createQuery(Mockito.same(params), any(String.class))).thenReturn(iter);
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(any(List.class), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
@ -249,7 +246,7 @@ public class SearchCoordinatorSvcImplTest {
assertEquals("19", resources.get(9).getIdElement().getValueAsString());
when(mySearchDao.findByUuid(eq(result.getUuid()))).thenReturn(search);
/*
* Now call from a new bundle provider. This simulates a separate HTTP
* client request coming in.
@ -275,7 +272,7 @@ public class SearchCoordinatorSvcImplTest {
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 100);
SlowIterator<Long> iter = new SlowIterator<Long>(pids.iterator(), 2);
SlowIterator iter = new SlowIterator(pids.iterator(), 2);
when(mySearchBuider.createQuery(Mockito.same(params), any(String.class))).thenReturn(iter);
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(any(List.class), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
@ -338,7 +335,7 @@ public class SearchCoordinatorSvcImplTest {
search.setStatus(SearchStatusEnum.FINISHED);
}
}.start();
/*
* Now call from a new bundle provider. This simulates a separate HTTP
* client request coming in.
@ -365,7 +362,7 @@ public class SearchCoordinatorSvcImplTest {
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 800);
when(mySearchBuider.createQuery(Mockito.same(params), any(String.class))).thenReturn(pids.iterator());
when(mySearchBuider.createQuery(Mockito.same(params), any(String.class))).thenReturn(new ResultIterator(pids.iterator()));
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(eq(pids), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
@ -386,7 +383,7 @@ public class SearchCoordinatorSvcImplTest {
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 800);
when(mySearchBuider.createQuery(Mockito.same(params), any(String.class))).thenReturn(pids.iterator());
when(mySearchBuider.createQuery(Mockito.same(params), any(String.class))).thenReturn(new ResultIterator(pids.iterator()));
pids = createPidSequence(10, 110);
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(eq(pids), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
@ -401,17 +398,12 @@ public class SearchCoordinatorSvcImplTest {
assertEquals("109", resources.get(99).getIdElement().getValueAsString());
}
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
public static class FailAfterNIterator<T> extends BaseIterator<T> implements Iterator<T> {
public static class FailAfterNIterator extends BaseIterator<Long> implements IResultIterator {
private int myCount;
private Iterator<T> myWrap;
private IResultIterator myWrap;
public FailAfterNIterator(Iterator<T> theWrap, int theCount) {
public FailAfterNIterator(IResultIterator theWrap, int theCount) {
myWrap = theWrap;
myCount = theCount;
}
@ -422,7 +414,7 @@ public class SearchCoordinatorSvcImplTest {
}
@Override
public T next() {
public Long next() {
myCount--;
if (myCount == 0) {
throw new NullPointerException("FAILED");
@ -432,13 +424,31 @@ public class SearchCoordinatorSvcImplTest {
}
public static class ResultIterator extends BaseIterator<Long> implements IResultIterator {
public static class SlowIterator<T> extends BaseIterator<T> implements Iterator<T> {
private final Iterator<Long> myWrap;
public ResultIterator(Iterator<Long> theWrap) {
myWrap = theWrap;
}
@Override
public boolean hasNext() {
return myWrap.hasNext();
}
@Override
public Long next() {
return myWrap.next();
}
}
public static class SlowIterator extends BaseIterator<Long> implements IResultIterator {
private int myDelay;
private Iterator<T> myWrap;
private Iterator<Long> myWrap;
public SlowIterator(Iterator<T> theWrap, int theDelay) {
public SlowIterator(Iterator<Long> theWrap, int theDelay) {
myWrap = theWrap;
myDelay = theDelay;
}
@ -449,7 +459,7 @@ public class SearchCoordinatorSvcImplTest {
}
@Override
public T next() {
public Long next() {
try {
Thread.sleep(myDelay);
} catch (InterruptedException e) {
@ -460,4 +470,9 @@ public class SearchCoordinatorSvcImplTest {
}
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
}

View File

@ -136,7 +136,7 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 {
requestValidator.setFailOnSeverity(null);
requestValidator.setAddResponseHeaderOnSeverity(null);
requestValidator.setAddResponseOutcomeHeaderOnSeverity(ResultSeverityEnum.INFORMATION);
// requestValidator.addValidatorModule(instanceValidatorDstu3());
requestValidator.addValidatorModule(instanceValidatorDstu3());
requestValidator.setIgnoreValidatorExceptions(true);
return requestValidator;