Make JPA searches more efficient by prefetching a much smaller number of

resources
This commit is contained in:
James Agnew 2018-10-01 21:36:10 -04:00
parent 1cfaeadcb8
commit 12b23fbf23
20 changed files with 43528 additions and 42909 deletions

View File

@ -1,5 +1,26 @@
package org.hl7.fhir.convertors; package org.hl7.fhir.convertors;
/*-
* #%L
* HAPI FHIR - Converter
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.hl7.fhir.utilities.Utilities; import org.hl7.fhir.utilities.Utilities;
public class VersionConvertorConstants { public class VersionConvertorConstants {

View File

@ -1,5 +1,26 @@
package org.hl7.fhir.convertors; package org.hl7.fhir.convertors;
/*-
* #%L
* HAPI FHIR - Converter
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;

View File

@ -1,5 +1,26 @@
package org.hl7.fhir.convertors; package org.hl7.fhir.convertors;
/*-
* #%L
* HAPI FHIR - Converter
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
/* /*
Copyright (c) 2011+, HL7, Inc. Copyright (c) 2011+, HL7, Inc.
All rights reserved. All rights reserved.

View File

@ -1,5 +1,26 @@
package org.hl7.fhir.convertors; package org.hl7.fhir.convertors;
/*-
* #%L
* HAPI FHIR - Converter
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;

View File

@ -152,6 +152,7 @@ public class DaoConfig {
private int myReindexThreadCount; private int myReindexThreadCount;
private Set<String> myBundleTypesAllowedForStorage; private Set<String> myBundleTypesAllowedForStorage;
private boolean myValidateSearchParameterExpressionsOnSave = true; private boolean myValidateSearchParameterExpressionsOnSave = true;
private List<Integer> myPreFetchThresholds = Arrays.asList(500, 2000, -1);
/** /**
* Constructor * Constructor
@ -164,7 +165,6 @@ public class DaoConfig {
setReindexThreadCount(Runtime.getRuntime().availableProcessors()); setReindexThreadCount(Runtime.getRuntime().availableProcessors());
setBundleTypesAllowedForStorage(DEFAULT_BUNDLE_TYPES_ALLOWED_FOR_STORAGE); setBundleTypesAllowedForStorage(DEFAULT_BUNDLE_TYPES_ALLOWED_FOR_STORAGE);
if ("true".equalsIgnoreCase(System.getProperty(DISABLE_STATUS_BASED_REINDEX))) { if ("true".equalsIgnoreCase(System.getProperty(DISABLE_STATUS_BASED_REINDEX))) {
ourLog.info("Status based reindexing is DISABLED"); ourLog.info("Status based reindexing is DISABLED");
setStatusBasedReindexingDisabled(true); setStatusBasedReindexingDisabled(true);
@ -493,13 +493,6 @@ public class DaoConfig {
return myInterceptors; return myInterceptors;
} }
/**
* This may be used to optionally register server interceptors directly against the DAOs.
*/
public void setInterceptors(List<IServerInterceptor> theInterceptors) {
myInterceptors = theInterceptors;
}
/** /**
* This may be used to optionally register server interceptors directly against the DAOs. * This may be used to optionally register server interceptors directly against the DAOs.
*/ */
@ -510,6 +503,13 @@ public class DaoConfig {
} }
} }
/**
* This may be used to optionally register server interceptors directly against the DAOs.
*/
public void setInterceptors(List<IServerInterceptor> theInterceptors) {
myInterceptors = theInterceptors;
}
/** /**
* See {@link #setMaximumExpansionSize(int)} * See {@link #setMaximumExpansionSize(int)}
*/ */
@ -1321,6 +1321,50 @@ public class DaoConfig {
setSubscriptionPurgeInactiveAfterMillis(theSeconds * DateUtils.MILLIS_PER_SECOND); setSubscriptionPurgeInactiveAfterMillis(theSeconds * DateUtils.MILLIS_PER_SECOND);
} }
/**
* This setting sets the number of search results to prefetch. For example, if this list
* is set to [100, 1000, -1] then the server will initially load 100 results and not
* attempt to load more. If the user requests subsequent page(s) of results and goes
* past 100 results, the system will load the next 900 (up to the following threshold of 1000).
* The system will progressively work through these thresholds.
*
* <p>
* A threshold of -1 means to load all results. Note that if the final threshold is a
* number other than <code>-1</code>, the system will never prefetch more than the
* given number.
* </p>
*/
public void setSearchPreFetchThresholds(List<Integer> thePreFetchThresholds) {
Validate.isTrue(thePreFetchThresholds.size() > 0, "thePreFetchThresholds must not be empty");
int last = 0;
for (Integer nextInteger : thePreFetchThresholds) {
int nextInt = nextInteger.intValue();
Validate.isTrue(nextInt > 0 || nextInt == -1, nextInt + " is not a valid prefetch threshold");
Validate.isTrue(nextInt != last, "Prefetch thresholds must be sequential");
Validate.isTrue(nextInt > last || nextInt == -1, "Prefetch thresholds must be sequential");
Validate.isTrue(last != -1, "Prefetch thresholds must be sequential");
last = nextInt;
}
myPreFetchThresholds = thePreFetchThresholds;
}
/**
* This setting sets the number of search results to prefetch. For example, if this list
* is set to [100, 1000, -1] then the server will initially load 100 results and not
* attempt to load more. If the user requests subsequent page(s) of results and goes
* past 100 results, the system will load the next 900 (up to the following threshold of 1000).
* The system will progressively work through these thresholds.
*
* <p>
* A threshold of -1 means to load all results. Note that if the final threshold is a
* number other than <code>-1</code>, the system will never prefetch more than the
* given number.
* </p>
*/
public List<Integer> getPreFetchThresholds() {
return myPreFetchThresholds;
}
public enum IndexEnabledEnum { public enum IndexEnabledEnum {
ENABLED, ENABLED,
DISABLED DISABLED

View File

@ -35,7 +35,9 @@ public interface ISearchBuilder {
Iterator<Long> createQuery(SearchParameterMap theParams, String theSearchUuid); Iterator<Long> createQuery(SearchParameterMap theParams, String theSearchUuid);
Iterator createCountQuery(SearchParameterMap theParams, String theSearchUuid); void setMaxResultsToFetch(Integer theMaxResultsToFetch);
Iterator<Long> createCountQuery(SearchParameterMap theParams, String theSearchUuid);
void loadResourcesByPid(Collection<Long> theIncludePids, List<IBaseResource> theResourceListToPopulate, Set<Long> theRevIncludedPids, boolean theForHistoryOperation, EntityManager theEntityManager, void loadResourcesByPid(Collection<Long> theIncludePids, List<IBaseResource> theResourceListToPopulate, Set<Long> theRevIncludedPids, boolean theForHistoryOperation, EntityManager theEntityManager,
FhirContext theContext, IDao theDao); FhirContext theContext, IDao theDao);
@ -50,4 +52,5 @@ public interface ISearchBuilder {
void setType(Class<? extends IBaseResource> theResourceType, String theResourceName); void setType(Class<? extends IBaseResource> theResourceType, String theResourceName);
void setPreviouslyAddedResourcePids(List<Long> thePreviouslyAddedResourcePids);
} }

View File

@ -70,6 +70,7 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.EntityManager; import javax.persistence.EntityManager;
import javax.persistence.TypedQuery; import javax.persistence.TypedQuery;
import javax.persistence.criteria.*; import javax.persistence.criteria.*;
@ -115,6 +116,8 @@ public class SearchBuilder implements ISearchBuilder {
private String mySearchUuid; private String mySearchUuid;
private IHapiTerminologySvc myTerminologySvc; private IHapiTerminologySvc myTerminologySvc;
private int myFetchSize; private int myFetchSize;
private Integer myMaxResultsToFetch;
private Set<Long> myPidSet;
/** /**
* Constructor * Constructor
@ -136,6 +139,11 @@ public class SearchBuilder implements ISearchBuilder {
myResourceSearchViewDao = theResourceViewDao; myResourceSearchViewDao = theResourceViewDao;
} }
@Override
public void setMaxResultsToFetch(Integer theMaxResultsToFetch) {
myMaxResultsToFetch = theMaxResultsToFetch;
}
private void addPredicateComposite(String theResourceName, RuntimeSearchParam theParamDef, List<? extends IQueryParameterType> theNextAnd) { private void addPredicateComposite(String theResourceName, RuntimeSearchParam theParamDef, List<? extends IQueryParameterType> theNextAnd) {
// TODO: fail if missing is set for a composite query // TODO: fail if missing is set for a composite query
@ -1276,6 +1284,14 @@ public class SearchBuilder implements ISearchBuilder {
return new CountQueryIterator(query); return new CountQueryIterator(query);
} }
/**
* @param thePidSet May be null
*/
@Override
public void setPreviouslyAddedResourcePids(@Nullable List<Long> thePidSet) {
myPidSet = new HashSet<>(thePidSet);
}
@Override @Override
public Iterator<Long> createQuery(SearchParameterMap theParams, String theSearchUuid) { public Iterator<Long> createQuery(SearchParameterMap theParams, String theSearchUuid) {
myParams = theParams; myParams = theParams;
@ -1332,6 +1348,11 @@ public class SearchBuilder implements ISearchBuilder {
ourLastHandlerMechanismForUnitTest = HandlerTypeEnum.STANDARD_QUERY; ourLastHandlerMechanismForUnitTest = HandlerTypeEnum.STANDARD_QUERY;
ourLastHandlerThreadForUnitTest = Thread.currentThread().getName(); ourLastHandlerThreadForUnitTest = Thread.currentThread().getName();
} }
if (myPidSet == null) {
myPidSet = new HashSet<>();
}
return new QueryIterator(); return new QueryIterator();
} }
@ -2028,7 +2049,7 @@ public class SearchBuilder implements ISearchBuilder {
} }
IQueryParameterType leftParam = toParameterType(compositeOf.get(0)); IQueryParameterType leftParam = toParameterType(compositeOf.get(0));
IQueryParameterType rightParam = toParameterType(compositeOf.get(1)); IQueryParameterType rightParam = toParameterType(compositeOf.get(1));
qp = new CompositeParam<IQueryParameterType, IQueryParameterType>(leftParam, rightParam); qp = new CompositeParam<>(leftParam, rightParam);
break; break;
case REFERENCE: case REFERENCE:
qp = new ReferenceParam(); qp = new ReferenceParam();
@ -2072,7 +2093,7 @@ public class SearchBuilder implements ISearchBuilder {
private int myPageSize = myCallingDao.getConfig().getEverythingIncludesFetchPageSize(); private int myPageSize = myCallingDao.getConfig().getEverythingIncludesFetchPageSize();
public IncludesIterator(Set<Long> thePidSet) { public IncludesIterator(Set<Long> thePidSet) {
myCurrentPids = new ArrayList<Long>(thePidSet); myCurrentPids = new ArrayList<>(thePidSet);
myCurrentIterator = EMPTY_LONG_LIST.iterator(); myCurrentIterator = EMPTY_LONG_LIST.iterator();
myCurrentOffset = 0; myCurrentOffset = 0;
} }
@ -2123,7 +2144,6 @@ public class SearchBuilder implements ISearchBuilder {
private final class QueryIterator extends BaseIterator<Long> implements Iterator<Long> { private final class QueryIterator extends BaseIterator<Long> implements Iterator<Long> {
private final Set<Long> myPidSet = new HashSet<Long>();
private boolean myFirst = true; private boolean myFirst = true;
private IncludesIterator myIncludesIterator; private IncludesIterator myIncludesIterator;
private Long myNext; private Long myNext;
@ -2150,9 +2170,11 @@ public class SearchBuilder implements ISearchBuilder {
// If we don't have a query yet, create one // If we don't have a query yet, create one
if (myResultsIterator == null) { if (myResultsIterator == null) {
Integer maximumResults = myCallingDao.getConfig().getFetchSizeDefaultMaximum(); Integer maxResultsToFetch = myMaxResultsToFetch;
if (maxResultsToFetch == null) {
final TypedQuery<Long> query = createQuery(mySort, maximumResults, false); maxResultsToFetch = myCallingDao.getConfig().getFetchSizeDefaultMaximum();
}
final TypedQuery<Long> query = createQuery(mySort, maxResultsToFetch, false);
Query<Long> hibernateQuery = (Query<Long>) query; Query<Long> hibernateQuery = (Query<Long>) query;
hibernateQuery.setFetchSize(myFetchSize); hibernateQuery.setFetchSize(myFetchSize);

View File

@ -9,6 +9,10 @@ import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query; import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param; import org.springframework.data.repository.query.Param;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/* /*
* #%L * #%L
* HAPI FHIR JPA Server * HAPI FHIR JPA Server
@ -34,6 +38,9 @@ public interface ISearchResultDao extends JpaRepository<SearchResult, Long> {
@Query(value="SELECT r.myResourcePid FROM SearchResult r WHERE r.mySearch = :search ORDER BY r.myOrder ASC") @Query(value="SELECT r.myResourcePid FROM SearchResult r WHERE r.mySearch = :search ORDER BY r.myOrder ASC")
Page<Long> findWithSearchUuid(@Param("search") Search theSearch, Pageable thePage); Page<Long> findWithSearchUuid(@Param("search") Search theSearch, Pageable thePage);
@Query(value="SELECT r.myResourcePid FROM SearchResult r WHERE r.mySearch = :search ORDER BY r.myOrder ASC")
List<Long> findWithSearchUuid(@Param("search") Search theSearch);
@Query(value="SELECT r.myId FROM SearchResult r WHERE r.mySearchPid = :search") @Query(value="SELECT r.myId FROM SearchResult r WHERE r.mySearchPid = :search")
Slice<Long> findForSearch(Pageable thePage, @Param("search") Long theSearchPid); Slice<Long> findForSearch(Pageable thePage, @Param("search") Long theSearchPid);
} }

View File

@ -1,7 +1,12 @@
package ca.uhn.fhir.jpa.entity; package ca.uhn.fhir.jpa.entity;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.model.api.Include; import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.rest.param.DateRangeParam; import ca.uhn.fhir.rest.param.DateRangeParam;
import org.apache.commons.lang3.SerializationUtils;
import org.hibernate.annotations.OptimisticLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.persistence.*; import javax.persistence.*;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
@ -79,6 +84,7 @@ public class Search implements Serializable {
@NotNull @NotNull
@Temporal(TemporalType.TIMESTAMP) @Temporal(TemporalType.TIMESTAMP)
@Column(name = "SEARCH_LAST_RETURNED", nullable = false, updatable = false) @Column(name = "SEARCH_LAST_RETURNED", nullable = false, updatable = false)
@OptimisticLock(excluded = true)
private Date mySearchLastReturned; private Date mySearchLastReturned;
@Lob() @Lob()
@Basic(fetch = FetchType.LAZY) @Basic(fetch = FetchType.LAZY)
@ -96,6 +102,13 @@ public class Search implements Serializable {
private Integer myTotalCount; private Integer myTotalCount;
@Column(name = "SEARCH_UUID", length = UUID_COLUMN_LENGTH, nullable = false, updatable = false) @Column(name = "SEARCH_UUID", length = UUID_COLUMN_LENGTH, nullable = false, updatable = false)
private String myUuid; private String myUuid;
@SuppressWarnings("unused")
@Version
@Column(name = "OPTLOCK_VERSION", nullable = true)
private Integer myVersion;
@Lob
@Column(name = "SEARCH_PARAM_MAP", nullable = true)
private byte[] mySearchParameterMap;
/** /**
* Constructor * Constructor
@ -241,6 +254,14 @@ public class Search implements Serializable {
myStatus = theStatus; myStatus = theStatus;
} }
/** FIXME: remove */
private static final Logger ourLog = LoggerFactory.getLogger(Search.class);
/** FIXME: remove */
@PrePersist
public void preSave() {
ourLog.info("** PREPERSIST - Version is {}", myVersion);
}
public Integer getTotalCount() { public Integer getTotalCount() {
return myTotalCount; return myTotalCount;
} }
@ -267,7 +288,7 @@ public class Search implements Serializable {
} }
private Set<Include> toIncList(boolean theWantReverse) { private Set<Include> toIncList(boolean theWantReverse) {
HashSet<Include> retVal = new HashSet<Include>(); HashSet<Include> retVal = new HashSet<>();
for (SearchInclude next : getIncludes()) { for (SearchInclude next : getIncludes()) {
if (theWantReverse == next.isReverse()) { if (theWantReverse == next.isReverse()) {
retVal.add(new Include(next.getInclude(), next.isRecurse())); retVal.add(new Include(next.getInclude(), next.isRecurse()));
@ -287,4 +308,16 @@ public class Search implements Serializable {
public void addInclude(SearchInclude theInclude) { public void addInclude(SearchInclude theInclude) {
getIncludes().add(theInclude); getIncludes().add(theInclude);
} }
public Integer getVersion() {
return myVersion;
}
public SearchParameterMap getSearchParameterMap() {
return SerializationUtils.deserialize(mySearchParameterMap);
}
public void setSearchParameterMap(SearchParameterMap theSearchParameterMap) {
mySearchParameterMap = SerializationUtils.serialize(theSearchParameterMap);
}
} }

View File

@ -22,8 +22,23 @@ package ca.uhn.fhir.jpa.entity;
public enum SearchStatusEnum { public enum SearchStatusEnum {
/**
* The search is currently actively working
*/
LOADING, LOADING,
/**
* The search has loaded a set of results and has stopped searching because it
* reached an appropriate threshold
*/
PASSCMPLET,
/**
* The search completed normally and loaded all of the results it as permitted to
* load
*/
FINISHED, FINISHED,
/**
* The search failed and will not continue
*/
FAILED FAILED
} }

View File

@ -63,12 +63,7 @@ public class PersistedJpaSearchFirstPageBundleProvider extends PersistedJpaBundl
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED); txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
List<IBaseResource> retVal = txTemplate.execute(new TransactionCallback<List<IBaseResource>>() { List<IBaseResource> retVal = txTemplate.execute(theStatus -> toResourceList(mySearchBuilder, pids));
@Override
public List<IBaseResource> doInTransaction(TransactionStatus theStatus) {
return toResourceList(mySearchBuilder, pids);
}
});
ourLog.trace("Loaded resources to return"); ourLog.trace("Loaded resources to return");

View File

@ -21,10 +21,7 @@ package ca.uhn.fhir.jpa.search;
*/ */
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.dao.*;
import ca.uhn.fhir.jpa.dao.IDao;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.dao.data.ISearchDao; import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao; import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
import ca.uhn.fhir.jpa.dao.data.ISearchResultDao; import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
@ -43,7 +40,6 @@ import ca.uhn.fhir.rest.server.method.PageMethodBinding;
import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.StopWatch;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
@ -67,11 +63,13 @@ import javax.persistence.EntityManager;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
public static final int DEFAULT_SYNC_SIZE = 250; public static final int DEFAULT_SYNC_SIZE = 250;
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<String, SearchTask>(); private final ConcurrentHashMap<String, BaseTask> myIdToSearchTask = new ConcurrentHashMap<>();
@Autowired @Autowired
private FhirContext myContext; private FhirContext myContext;
@Autowired @Autowired
@ -90,6 +88,8 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
private ISearchResultDao mySearchResultDao; private ISearchResultDao mySearchResultDao;
@Autowired @Autowired
private PlatformTransactionManager myManagedTxManager; private PlatformTransactionManager myManagedTxManager;
@Autowired
private IFhirSystemDao<?, ?> mySystemDao;
private int mySyncSize = DEFAULT_SYNC_SIZE; private int mySyncSize = DEFAULT_SYNC_SIZE;
@ -103,7 +103,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
@Override @Override
public void cancelAllActiveSearches() { public void cancelAllActiveSearches() {
for (SearchTask next : myIdToSearchTask.values()) { for (BaseTask next : myIdToSearchTask.values()) {
next.requestImmediateAbort(); next.requestImmediateAbort();
try { try {
next.getCompletionLatch().await(30, TimeUnit.SECONDS); next.getCompletionLatch().await(30, TimeUnit.SECONDS);
@ -113,21 +113,13 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} }
} }
/**
* This method is called by the HTTP client processing thread in order to
* fetch resources.
*/
@Override @Override
@Transactional(propagation = Propagation.NEVER) @Transactional(propagation = Propagation.NEVER)
public List<Long> getResources(final String theUuid, int theFrom, int theTo) { public List<Long> getResources(final String theUuid, int theFrom, int theTo) {
if (myNeverUseLocalSearchForUnitTests == false) {
SearchTask task = myIdToSearchTask.get(theUuid);
if (task != null) {
ourLog.trace("Local search found");
return task.getResourcePids(theFrom, theTo);
} else {
ourLog.trace("No local search found");
}
} else {
ourLog.trace("Forced not using local search");
}
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
@ -135,12 +127,18 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
StopWatch sw = new StopWatch(); StopWatch sw = new StopWatch();
while (true) { while (true) {
search = txTemplate.execute(new TransactionCallback<Search>() { if (myNeverUseLocalSearchForUnitTests == false) {
@Override BaseTask task = myIdToSearchTask.get(theUuid);
public Search doInTransaction(TransactionStatus theStatus) { if (task != null) {
return mySearchDao.findByUuid(theUuid); ourLog.trace("Local search found");
List<Long> resourcePids = task.getResourcePids(theFrom, theTo);
if (resourcePids != null) {
return resourcePids;
} }
}); }
}
search = txTemplate.execute(t -> mySearchDao.findByUuid(theUuid));
if (search == null) { if (search == null) {
ourLog.info("Client requested unknown paging ID[{}]", theUuid); ourLog.info("Client requested unknown paging ID[{}]", theUuid);
@ -162,6 +160,21 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
throw new InternalErrorException("Request timed out after " + sw.getMillis() + "ms"); throw new InternalErrorException("Request timed out after " + sw.getMillis() + "ms");
} }
// If the search was saved in "pass complete mode" it's probably time to
// start a new pass
if (search.getStatus() == SearchStatusEnum.PASSCMPLET) {
Optional<Search> newSearch = tryToMarkSearchAsInProgress(search);
if (newSearch.isPresent()) {
search = newSearch.get();
String resourceType = search.getResourceType();
Class<? extends IBaseResource> type = myContext.getResourceDefinition(resourceType).getImplementingClass();
SearchParameterMap params = search.getSearchParameterMap();
SearchContinuationTask task = new SearchContinuationTask(search, mySystemDao.getDao(type), params, resourceType);
myIdToSearchTask.put(search.getUuid(), task);
myExecutor.submit(task);
}
}
try { try {
Thread.sleep(500); Thread.sleep(500);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -176,20 +189,40 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
final Search foundSearch = search; final Search foundSearch = search;
List<Long> retVal = txTemplate.execute(new TransactionCallback<List<Long>>() { ourLog.trace("Loading stored search");
@Override List<Long> retVal = txTemplate.execute(theStatus -> {
public List<Long> doInTransaction(TransactionStatus theStatus) { final List<Long> resultPids = new ArrayList<>();
final List<Long> resultPids = new ArrayList<Long>();
Page<Long> searchResultPids = mySearchResultDao.findWithSearchUuid(foundSearch, page); Page<Long> searchResultPids = mySearchResultDao.findWithSearchUuid(foundSearch, page);
for (Long next : searchResultPids) { for (Long next : searchResultPids) {
resultPids.add(next); resultPids.add(next);
} }
return resultPids; return resultPids;
}
}); });
return retVal; return retVal;
} }
private Optional<Search> tryToMarkSearchAsInProgress(Search theSearch) {
ourLog.trace("Going to try to change search status from {} to {}", theSearch.getStatus(), SearchStatusEnum.LOADING);
try {
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
txTemplate.afterPropertiesSet();
return txTemplate.execute(t -> {
Optional<Search> searchOpt = mySearchDao.findById(theSearch.getId());
Search search = searchOpt.orElseThrow(IllegalStateException::new);
if (search.getStatus() != SearchStatusEnum.PASSCMPLET) {
throw new IllegalStateException("Can't change to LOADING because state is " + search.getStatus());
}
theSearch.setStatus(SearchStatusEnum.LOADING);
Search newSearch = mySearchDao.save(theSearch);
return Optional.of(newSearch);
});
} catch (Exception e) {
ourLog.warn("Failed to activate search: {}", e.toString());
return Optional.empty();
}
}
private void populateBundleProvider(PersistedJpaBundleProvider theRetVal) { private void populateBundleProvider(PersistedJpaBundleProvider theRetVal) {
theRetVal.setContext(myContext); theRetVal.setContext(myContext);
theRetVal.setEntityManager(myEntityManager); theRetVal.setEntityManager(myEntityManager);
@ -324,7 +357,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
Search search = new Search(); Search search = new Search();
populateSearchEntity(theParams, theResourceType, searchUuid, queryString, search); populateSearchEntity(theParams, theResourceType, searchUuid, queryString, search);
SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType, searchUuid); SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType);
myIdToSearchTask.put(search.getUuid(), task); myIdToSearchTask.put(search.getUuid(), task);
myExecutor.submit(task); myExecutor.submit(task);
@ -391,65 +424,194 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
myManagedTxManager = theTxManager; myManagedTxManager = theTxManager;
} }
/** public abstract class BaseTask implements Callable<Void> {
* A search task is a Callable task that runs in // FIXME: don't make this protected
* a thread pool to handle an individual search. One instance protected final Search mySearch;
* is created for any requested search and runs from the protected final SearchParameterMap myParams;
* beginning to the end of the search. protected final IDao myCallingDao;
* <p> protected final String myResourceType;
* Understand: protected final ArrayList<Long> mySyncedPids = new ArrayList<>();
* This class executes in its own thread separate from the protected final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
* web server client thread that made the request. We do that
* so that we can return to the client as soon as possible,
* but keep the search going in the background (and have
* the next page of results ready to go when the client asks).
*/
public class SearchTask implements Callable<Void> {
private final IDao myCallingDao;
private final CountDownLatch myCompletionLatch; private final CountDownLatch myCompletionLatch;
private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
private final SearchParameterMap myParams;
private final String myResourceType;
private final Search mySearch;
private final ArrayList<Long> mySyncedPids = new ArrayList<>();
private final ArrayList<Long> myUnsyncedPids = new ArrayList<>(); private final ArrayList<Long> myUnsyncedPids = new ArrayList<>();
private boolean myAbortRequested; private boolean myAbortRequested;
private int myCountSaved = 0; private int myCountSaved = 0;
private String mySearchUuid; private boolean myAdditionalPrefetchThresholdsRemaining;
private int myTotalNumberSynced;
private List<Long> myPreviouslyAddedResourcePids;
/** /**
* Constructor * Constructor
*/ */
public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, String theSearchUuid) { protected BaseTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
mySearch = theSearch; mySearch = theSearch;
myCallingDao = theCallingDao; myCallingDao = theCallingDao;
myParams = theParams; myParams = theParams;
myResourceType = theResourceType; myResourceType = theResourceType;
myCompletionLatch = new CountDownLatch(1); myCompletionLatch = new CountDownLatch(1);
mySearchUuid = theSearchUuid; }
protected void setPreviouslyAddedResourcePids(List<Long> thePreviouslyAddedResourcePids) {
myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids;
myCountSaved = myPreviouslyAddedResourcePids.size();
}
private ISearchBuilder newSearchBuilder() {
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
ISearchBuilder sb = myCallingDao.newSearchBuilder();
sb.setType(resourceTypeClass, myResourceType);
// How many results are we pre-loading right now
int currentlyLoaded = defaultIfNull(mySearch.getTotalCount(), 0);
for (Iterator<Integer> iter = myDaoConfig.getPreFetchThresholds().iterator(); iter.hasNext(); ) {
int next = iter.next();
if (next != -1 && next <= currentlyLoaded) {
continue;
}
if (next == -1) {
sb.setMaxResultsToFetch(null);
} else {
sb.setMaxResultsToFetch(next);
}
if (iter.hasNext()) {
myAdditionalPrefetchThresholdsRemaining = true;
}
// If we get here's we've found an appropriate threshold
break;
}
return sb;
}
public List<Long> getResourcePids(int theFromIndex, int theToIndex) {
ourLog.info("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
boolean keepWaiting;
do {
synchronized (mySyncedPids) {
ourLog.trace("Search status is {}", mySearch.getStatus());
keepWaiting = mySyncedPids.size() < theToIndex && mySearch.getStatus() == SearchStatusEnum.LOADING;
}
if (keepWaiting) {
ourLog.info("Waiting, as we only have {} results", mySyncedPids.size());
try {
Thread.sleep(500);
} catch (InterruptedException theE) {
// ignore
}
}
} while (keepWaiting);
ourLog.info("Proceeding, as we have {} results", mySyncedPids.size());
ArrayList<Long> retVal = new ArrayList<>();
synchronized (mySyncedPids) {
verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
int toIndex = theToIndex;
if (mySyncedPids.size() < toIndex) {
toIndex = mySyncedPids.size();
}
for (int i = theFromIndex; i < toIndex; i++) {
retVal.add(mySyncedPids.get(i));
}
}
ourLog.trace("Done syncing results - Wanted {}-{} and returning {} of {}", theFromIndex, theToIndex, retVal.size(), mySyncedPids.size());
return retVal;
}
protected void saveSearch() {
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
doSaveSearch();
}
});
}
private void saveUnsynced(final Iterator<Long> theResultIter) {
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
if (mySearch.getId() == null) {
doSaveSearch();
}
List<SearchResult> resultsToSave = Lists.newArrayList();
for (Long nextPid : myUnsyncedPids) {
SearchResult nextResult = new SearchResult(mySearch);
nextResult.setResourcePid(nextPid);
nextResult.setOrder(myCountSaved++);
resultsToSave.add(nextResult);
ourLog.trace("Saving ORDER[{}] Resource {}", nextResult.getOrder(), nextResult.getResourcePid());
}
mySearchResultDao.saveAll(resultsToSave);
synchronized (mySyncedPids) {
int numSyncedThisPass = myUnsyncedPids.size();
myTotalNumberSynced += numSyncedThisPass;
ourLog.trace("Syncing {} search results", numSyncedThisPass);
mySyncedPids.addAll(myUnsyncedPids);
myUnsyncedPids.clear();
if (theResultIter.hasNext() == false) {
mySearch.setTotalCount(myCountSaved);
if (myAdditionalPrefetchThresholdsRemaining) {
ourLog.trace("Setting search status to PASSCMPLET");
mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
mySearch.setSearchParameterMap(myParams);
} else {
mySearch.setStatus(SearchStatusEnum.FINISHED);
}
}
}
mySearch.setNumFound(myCountSaved);
int numSynced;
synchronized (mySyncedPids) {
numSynced = mySyncedPids.size();
}
if (myDaoConfig.getCountSearchResultsUpTo() == null ||
myDaoConfig.getCountSearchResultsUpTo() <= 0 ||
myDaoConfig.getCountSearchResultsUpTo() <= numSynced) {
myInitialCollectionLatch.countDown();
}
doSaveSearch();
}
});
}
public boolean isNotAborted() {
return myAbortRequested == false;
}
protected void markComplete() {
myCompletionLatch.countDown();
}
public CountDownLatch getCompletionLatch() {
return myCompletionLatch;
} }
/** /**
* This method is called by the server HTTP thread, and * Request that the task abort as soon as possible
* will block until at least one page of results have been
* fetched from the DB, and will never block after that.
*/ */
public Integer awaitInitialSync() { public void requestImmediateAbort() {
ourLog.trace("Awaiting initial sync"); myAbortRequested = true;
do {
try {
if (myInitialCollectionLatch.await(250, TimeUnit.MILLISECONDS)) {
break;
}
} catch (InterruptedException e) {
// Shouldn't happen
throw new InternalErrorException(e);
}
} while (mySearch.getStatus() == SearchStatusEnum.LOADING);
ourLog.trace("Initial sync completed");
return mySearch.getTotalCount();
} }
/** /**
@ -497,7 +659,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
myUnsyncedPids.clear(); myUnsyncedPids.clear();
Throwable rootCause = ExceptionUtils.getRootCause(t); Throwable rootCause = ExceptionUtils.getRootCause(t);
rootCause = ObjectUtils.defaultIfNull(rootCause, t); rootCause = defaultIfNull(rootCause, t);
String failureMessage = rootCause.getMessage(); String failureMessage = rootCause.getMessage();
@ -516,7 +678,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
myIdToSearchTask.remove(mySearch.getUuid()); myIdToSearchTask.remove(mySearch.getUuid());
myInitialCollectionLatch.countDown(); myInitialCollectionLatch.countDown();
myCompletionLatch.countDown(); markComplete();
} }
return null; return null;
@ -539,7 +701,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
boolean wantOnlyCount = wantCount && myParams.getSummaryMode().size() == 1; boolean wantOnlyCount = wantCount && myParams.getSummaryMode().size() == 1;
if (wantCount) { if (wantCount) {
ISearchBuilder sb = newSearchBuilder(); ISearchBuilder sb = newSearchBuilder();
Iterator<Long> countIterator = sb.createCountQuery(myParams, mySearchUuid); Iterator<Long> countIterator = sb.createCountQuery(myParams, mySearch.getUuid());
Long count = countIterator.next(); Long count = countIterator.next();
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
@ -560,12 +722,19 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} }
ISearchBuilder sb = newSearchBuilder(); ISearchBuilder sb = newSearchBuilder();
Iterator<Long> theResultIterator = sb.createQuery(myParams, mySearchUuid); if (myPreviouslyAddedResourcePids != null) {
sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids);
mySyncedPids.addAll(myPreviouslyAddedResourcePids);
}
Iterator<Long> theResultIterator = sb.createQuery(myParams, mySearch.getUuid());
int syncSize = mySyncSize;
while (theResultIterator.hasNext()) { while (theResultIterator.hasNext()) {
myUnsyncedPids.add(theResultIterator.next()); myUnsyncedPids.add(theResultIterator.next());
boolean shouldSync = myUnsyncedPids.size() >= mySyncSize; boolean shouldSync = myUnsyncedPids.size() >= syncSize;
if (myDaoConfig.getCountSearchResultsUpTo() != null && if (myDaoConfig.getCountSearchResultsUpTo() != null &&
myDaoConfig.getCountSearchResultsUpTo() > 0 && myDaoConfig.getCountSearchResultsUpTo() > 0 &&
@ -578,7 +747,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} }
// If no abort was requested, bail out // If no abort was requested, bail out
Validate.isTrue(myAbortRequested == false, "Abort has been requested"); Validate.isTrue(isNotAborted(), "Abort has been requested");
if (shouldSync) { if (shouldSync) {
saveUnsynced(theResultIterator); saveUnsynced(theResultIterator);
@ -595,128 +764,89 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} }
// If no abort was requested, bail out // If no abort was requested, bail out
Validate.isTrue(myAbortRequested == false, "Abort has been requested"); Validate.isTrue(isNotAborted(), "Abort has been requested");
saveUnsynced(theResultIterator); saveUnsynced(theResultIterator);
} }
private ISearchBuilder newSearchBuilder() {
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
ISearchBuilder sb = myCallingDao.newSearchBuilder();
sb.setType(resourceTypeClass, myResourceType);
return sb;
} }
public CountDownLatch getCompletionLatch() {
return myCompletionLatch; public class SearchContinuationTask extends BaseTask {
public SearchContinuationTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
super(theSearch, theCallingDao, theParams, theResourceType);
} }
public List<Long> getResourcePids(int theFromIndex, int theToIndex) { @Override
ourLog.info("Requesting search PIDs from {}-{}", theFromIndex, theToIndex); public Void call() {
boolean keepWaiting;
do {
synchronized (mySyncedPids) {
keepWaiting = mySyncedPids.size() < theToIndex && mySearch.getStatus() == SearchStatusEnum.LOADING;
}
if (keepWaiting) {
ourLog.info("Waiting, as we only have {} results", mySyncedPids.size());
try { try {
Thread.sleep(500); TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
} catch (InterruptedException theE) { txTemplate.afterPropertiesSet();
// ignore txTemplate.execute(t -> {
} List<Long> previouslyAddedResourcePids = mySearchResultDao.findWithSearchUuid(mySearch);
} setPreviouslyAddedResourcePids(previouslyAddedResourcePids);
} while (keepWaiting); return null;
});
} catch (Throwable e) {
ourLog.error("Failure processing search", e);
mySearch.setFailureMessage(e.toString());
mySearch.setStatus(SearchStatusEnum.FAILED);
ourLog.info("Proceeding, as we have {} results", mySyncedPids.size()); saveSearch();
return null;
ArrayList<Long> retVal = new ArrayList<>();
synchronized (mySyncedPids) {
verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
int toIndex = theToIndex;
if (mySyncedPids.size() < toIndex) {
toIndex = mySyncedPids.size();
}
for (int i = theFromIndex; i < toIndex; i++) {
retVal.add(mySyncedPids.get(i));
}
} }
ourLog.info("Done syncing results", mySyncedPids.size()); return super.call();
}
return retVal; @Override
public List<Long> getResourcePids(int theFromIndex, int theToIndex) {
return super.getResourcePids(theFromIndex, theToIndex);
}
} }
/** /**
* Request that the task abort as soon as possible * A search task is a Callable task that runs in
* a thread pool to handle an individual search. One instance
* is created for any requested search and runs from the
* beginning to the end of the search.
* <p>
* Understand:
* This class executes in its own thread separate from the
* web server client thread that made the request. We do that
* so that we can return to the client as soon as possible,
* but keep the search going in the background (and have
* the next page of results ready to go when the client asks).
*/ */
public void requestImmediateAbort() { public class SearchTask extends BaseTask {
myAbortRequested = true;
/**
* Constructor
*/
public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
super(theSearch, theCallingDao, theParams, theResourceType);
} }
private void saveSearch() { /**
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); * This method is called by the server HTTP thread, and
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW); * will block until at least one page of results have been
txTemplate.execute(new TransactionCallbackWithoutResult() { * fetched from the DB, and will never block after that.
@Override */
protected void doInTransactionWithoutResult(TransactionStatus theArg0) { public Integer awaitInitialSync() {
doSaveSearch(); ourLog.trace("Awaiting initial sync");
do {
try {
if (myInitialCollectionLatch.await(250, TimeUnit.MILLISECONDS)) {
break;
} }
} catch (InterruptedException e) {
}); // Shouldn't happen
throw new InternalErrorException(e);
} }
} while (mySearch.getStatus() == SearchStatusEnum.LOADING);
ourLog.trace("Initial sync completed");
private void saveUnsynced(final Iterator<Long> theResultIter) { return mySearch.getTotalCount();
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
if (mySearch.getId() == null) {
doSaveSearch();
}
List<SearchResult> resultsToSave = Lists.newArrayList();
for (Long nextPid : myUnsyncedPids) {
SearchResult nextResult = new SearchResult(mySearch);
nextResult.setResourcePid(nextPid);
nextResult.setOrder(myCountSaved++);
resultsToSave.add(nextResult);
}
mySearchResultDao.saveAll(resultsToSave);
synchronized (mySyncedPids) {
int numSyncedThisPass = myUnsyncedPids.size();
ourLog.trace("Syncing {} search results", numSyncedThisPass);
mySyncedPids.addAll(myUnsyncedPids);
myUnsyncedPids.clear();
if (theResultIter.hasNext() == false) {
mySearch.setTotalCount(myCountSaved);
mySearch.setStatus(SearchStatusEnum.FINISHED);
}
}
mySearch.setNumFound(myCountSaved);
int numSynced;
synchronized (mySyncedPids) {
numSynced = mySyncedPids.size();
}
if (myDaoConfig.getCountSearchResultsUpTo() == null ||
myDaoConfig.getCountSearchResultsUpTo() <= 0 ||
myDaoConfig.getCountSearchResultsUpTo() <= numSynced) {
myInitialCollectionLatch.countDown();
}
doSaveSearch();
}
});
} }
} }
@ -773,7 +903,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) { static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) {
if (theSearch.getStatus() == SearchStatusEnum.FAILED) { if (theSearch.getStatus() == SearchStatusEnum.FAILED) {
Integer status = theSearch.getFailureCode(); Integer status = theSearch.getFailureCode();
status = ObjectUtils.defaultIfNull(status, 500); status = defaultIfNull(status, 500);
String message = theSearch.getFailureMessage(); String message = theSearch.getFailureMessage();
throw BaseServerResponseException.newInstance(status, message); throw BaseServerResponseException.newInstance(status, message);

View File

@ -105,7 +105,7 @@ public class TestR4Config extends BaseJavaConfigR4 {
DataSource dataSource = ProxyDataSourceBuilder DataSource dataSource = ProxyDataSourceBuilder
.create(retVal) .create(retVal)
.logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL") // .logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL")
.logSlowQueryBySlf4j(10, TimeUnit.SECONDS) .logSlowQueryBySlf4j(10, TimeUnit.SECONDS)
.countQuery(new ThreadQueryCountHolder()) .countQuery(new ThreadQueryCountHolder())
.build(); .build();

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.entity.TermConcept; import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.provider.SystemProviderDstu2Test; import ca.uhn.fhir.jpa.provider.SystemProviderDstu2Test;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc; import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider; import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc; import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
@ -35,6 +36,7 @@ import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionDefinition;
@ -211,15 +213,30 @@ public abstract class BaseJpaTest {
} }
protected List<String> toUnqualifiedVersionlessIdValues(IBundleProvider theFound) { protected List<String> toUnqualifiedVersionlessIdValues(IBundleProvider theFound) {
List<String> retVal = new ArrayList<String>(); int fromIndex = 0;
Integer size = theFound.size(); Integer toIndex = theFound.size();
ourLog.info("Found {} results", size); return toUnqualifiedVersionlessIdValues(theFound, fromIndex, toIndex, true);
if (size == null) {
size = 99999;
} }
List<IBaseResource> resources = theFound.getResources(0, size); @Autowired
private DatabaseBackedPagingProvider myDatabaseBackedPagingProvider;
protected List<String> toUnqualifiedVersionlessIdValues(IBundleProvider theFound, int theFromIndex, Integer theToIndex, boolean theFirstCall) {
if (theToIndex == null) {
theToIndex = 99999;
}
List<String> retVal = new ArrayList<>();
IBundleProvider bundleProvider;
if (theFirstCall) {
bundleProvider = theFound;
} else {
bundleProvider = myDatabaseBackedPagingProvider.retrieveResultList(theFound.getUuid());
}
List<IBaseResource> resources = bundleProvider.getResources(theFromIndex, theToIndex);
for (IBaseResource next : resources) { for (IBaseResource next : resources) {
retVal.add(next.getIdElement().toUnqualifiedVersionless().getValue()); retVal.add(next.getIdElement().toUnqualifiedVersionless().getValue());
} }

View File

@ -208,6 +208,8 @@ public abstract class BaseJpaR4Test extends BaseJpaTest {
@Autowired @Autowired
protected ISearchDao mySearchEntityDao; protected ISearchDao mySearchEntityDao;
@Autowired @Autowired
protected ISearchResultDao mySearchResultDao;
@Autowired
@Qualifier("mySearchParameterDaoR4") @Qualifier("mySearchParameterDaoR4")
protected IFhirResourceDao<SearchParameter> mySearchParameterDao; protected IFhirResourceDao<SearchParameter> mySearchParameterDao;
@Autowired @Autowired

View File

@ -0,0 +1,235 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchStatusEnum;
import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.util.TestUtil;
import org.hl7.fhir.r4.model.Patient;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import static org.apache.commons.lang3.StringUtils.leftPad;
import static org.junit.Assert.assertEquals;
@SuppressWarnings({"unchecked", "deprecation", "Duplicates"})
public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoR4SearchOptimizedTest.class);
@After
public final void after() {
}
@Before
public void start() {
runInTransaction(() -> {
for (int i = 0; i < 200; i++) {
Patient p = new Patient();
p.setId("PT" + leftPad(Integer.toString(i), 5, '0'));
p.setActive(true);
p.addName().setFamily("FAM" + leftPad(Integer.toString(i), 5, '0'));
myPatientDao.update(p);
}
});
}
@Test
public void testFetchOnlySmallBatches() {
myDaoConfig.setSearchPreFetchThresholds(Arrays.asList(20, 50, 190));
/*
* Load the first page of 10
*/
SearchParameterMap params = new SearchParameterMap();
params.setSort(new SortSpec(Patient.SP_NAME));
IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00009", ids.get(9));
/*
* 20 should be prefetched since that's the initial page size
*/
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(20, search.getTotalCount().intValue());
assertEquals(search.getTotalCount().intValue(), mySearchResultDao.count());
assertEquals(1, search.getVersion().intValue());
assertEquals(SearchStatusEnum.PASSCMPLET, search.getStatus());
});
/*
* Load a few more that shouldn't require a new page fetch
*/
params = new SearchParameterMap();
params.setSort(new SortSpec(Patient.SP_NAME));
results = myPatientDao.search(params);
ids = toUnqualifiedVersionlessIdValues(results, 10, 15, false);
assertEquals("Patient/PT00010", ids.get(0));
assertEquals("Patient/PT00014", ids.get(4));
/*
* Search should be untouched
*/
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(1, search.getVersion().intValue());
});
/*
* Now load a page that crosses the next threshold
*/
ids = toUnqualifiedVersionlessIdValues(results, 15, 25, false);
assertEquals("Patient/PT00015", ids.get(0));
assertEquals("Patient/PT00024", ids.get(9));
/*
* Search gets incremented twice as a part of loading the next batch
*/
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(50, search.getTotalCount().intValue());
assertEquals(search.getTotalCount().intValue(), mySearchResultDao.count());
assertEquals(3, search.getVersion().intValue());
assertEquals(SearchStatusEnum.PASSCMPLET, search.getStatus());
});
/*
* Load a few more that shouldn't require a new page fetch
*/
params = new SearchParameterMap();
params.setSort(new SortSpec(Patient.SP_NAME));
results = myPatientDao.search(params);
ids = toUnqualifiedVersionlessIdValues(results, 25, 30, false);
assertEquals("Patient/PT00025", ids.get(0));
assertEquals("Patient/PT00029", ids.get(4));
/*
* Search should be untouched
*/
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(3, search.getVersion().intValue());
});
/*
* Now load a page that crosses the next threshold
*/
ids = toUnqualifiedVersionlessIdValues(results, 50, 60, false);
assertEquals("Patient/PT00050", ids.get(0));
assertEquals("Patient/PT00059", ids.get(9));
/*
* Search gets incremented twice as a part of loading the next batch
*/
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(190, search.getTotalCount().intValue());
assertEquals(search.getTotalCount().intValue(), mySearchResultDao.count());
assertEquals(5, search.getVersion().intValue());
assertEquals(SearchStatusEnum.FINISHED, search.getStatus());
});
/*
* Finally, load a page at the very end of the possible pages
*/
ids = toUnqualifiedVersionlessIdValues(results, 180, 200, false);
assertEquals(10, ids.size());
assertEquals("Patient/PT00180", ids.get(0));
assertEquals("Patient/PT00189", ids.get(9));
}
@Test
public void testFetchSecondBatchInManyThreads() throws Throwable {
myDaoConfig.setSearchPreFetchThresholds(Arrays.asList(20, -1));
/*
* Load the first page of 10
*/
SearchParameterMap params = new SearchParameterMap();
params.setSort(new SortSpec(Patient.SP_NAME));
final IBundleProvider results = myPatientDao.search(params);
String uuid = results.getUuid();
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
assertEquals("Patient/PT00000", ids.get(0));
assertEquals("Patient/PT00009", ids.get(9));
/*
* 20 should be prefetched since that's the initial page size
*/
runInTransaction(() -> {
Search search = mySearchEntityDao.findByUuid(uuid);
assertEquals(20, search.getTotalCount().intValue());
assertEquals(search.getTotalCount().intValue(), mySearchResultDao.count());
assertEquals(1, search.getVersion().intValue());
assertEquals(SearchStatusEnum.PASSCMPLET, search.getStatus());
});
/*
* Load a few more that shouldn't require a new page fetch
*/
ThreadPoolExecutorFactoryBean executorFactory = new ThreadPoolExecutorFactoryBean();
executorFactory.setCorePoolSize(20);
executorFactory.setMaxPoolSize(20);
executorFactory.afterPropertiesSet();
ExecutorService executor = executorFactory.getObject();
List<Future<Throwable>> futures = new ArrayList<>();
for (int i = 0; i < 20; i++) {
int finalI = i;
Future<Throwable> future = executor.submit(() -> {
try {
List<String> ids1 = toUnqualifiedVersionlessIdValues(results, 180, 190, false);
assertEquals("Patient/PT00180", ids1.get(0));
assertEquals("Patient/PT00189", ids1.get(9));
} catch (Throwable t) {
ourLog.error("Exception in thread {} - {}", finalI, t.toString());
return t;
}
return null;
});
futures.add(future);
}
for (Future<Throwable> next : futures) {
Throwable t = next.get();
if (t!=null) {
throw t;
}
}
executor.shutdownNow();
}
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
}

View File

@ -53,6 +53,13 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
.nonNullable() .nonNullable()
.withType(BaseTableColumnTypeTask.ColumnTypeEnum.STRING, ResourceLink.SRC_PATH_LENGTH); .withType(BaseTableColumnTypeTask.ColumnTypeEnum.STRING, ResourceLink.SRC_PATH_LENGTH);
// Search
Builder.BuilderWithTableName search = version.onTable("HFJ_SEARCH");
version.startSectionWithMessage("Starting work on table: " + search.getTableName());
search
.addColumn("OPTLOCK_VERSION")
.nullable()
.type(BaseTableColumnTypeTask.ColumnTypeEnum.INT);
} }
private void init350() { private void init350() {

View File

@ -1,5 +1,7 @@
<div> <div>
{{ Patient.name.given }}
{% if name.empty() == false %} {% if name.empty() == false %}
{{ name[0].family }} {{ name[0].family }}
{{ name[0].given }} {{ name[0].given }}

View File

@ -53,8 +53,6 @@ import org.w3c.dom.Node;
import org.xml.sax.InputSource; import org.xml.sax.InputSource;
import org.xml.sax.XMLReader; import org.xml.sax.XMLReader;
import com.sun.webkit.ContextMenu.ShowContext;
public class XmlParser extends ParserBase { public class XmlParser extends ParserBase {
private boolean allowXsiLocation; private boolean allowXsiLocation;

View File

@ -33,7 +33,8 @@
has been corrected so that an IllegalArgumentException is now thrown. has been corrected so that an IllegalArgumentException is now thrown.
</action> </action>
<action type="add"> <action type="add">
A new operation has been added to the JPA server called $retrigger-subscription. This can A new operation has been added to the JPA server called
<![CDATA[<code>$retrigger-subscription</code>]]>. This can
be used to cause a transaction to redeliver a resource that previously triggered. be used to cause a transaction to redeliver a resource that previously triggered.
</action> </action>
<action type="add"> <action type="add">
@ -48,6 +49,22 @@
When using the HAPI FHIR CLI, user-prompted passwords were not correctly encoded, meaning that the When using the HAPI FHIR CLI, user-prompted passwords were not correctly encoded, meaning that the
"--basic-auth PROMPT" action was not usable. This has been corrected. "--basic-auth PROMPT" action was not usable. This has been corrected.
</action> </action>
<action type="add">
The JPA server SearchCoordinator has been refactored to make searches more efficient:
When a search is performed, the SearchCoordinator loads multiple pages of results even
if the user has only requested a small number. This is done in order to prevent needing
to re-run the search for every page of results that is loaded.
In previous versions of HAPI FHIR, when a search was made the SearchCoordinator would
prefetch as many results as the user could possibly request across all pages (even if
this meant prefetching thousands or millions of resources).
As of this version, a new option has been added to DaoConfig that specifies how many
resources to prefetch. This can have a significant impact on performance for servers
with a large number of resources, where users often only want the first page
of results.
See
<![CDATA[<code>DatConfig#setSearchPreFetchThresholds()</code>]]>
for configuration of this feature.
</action>
</release> </release>
<release version="3.5.0" date="2018-09-17"> <release version="3.5.0" date="2018-09-17">
@ -393,6 +410,14 @@
guaranteed otherwise, since the Search Controller can result in data being returned guaranteed otherwise, since the Search Controller can result in data being returned
before the total number of results is known). before the total number of results is known).
</action> </action>
<action type="add">
The JPA server SearchCoordinator now prefetches only a smaller and configurable number
of results during the initial search request, and more may be requested in subsequent
page requests. This change may have a significant improvement on performance: in
previous versions of HAPI FHIR, even if the user only wanted the first page of 10
results, many many more might be prefetched, consuming database resources and
server time.
</action>
</release> </release>
<release version="3.4.0" date="2018-05-28"> <release version="3.4.0" date="2018-05-28">
<action type="add"> <action type="add">