More perf work

This commit is contained in:
James Agnew 2017-04-16 12:08:23 -04:00
parent 913fd422a1
commit 62ece72e6f
20 changed files with 1615 additions and 1000 deletions

View File

@ -21,6 +21,7 @@ package ca.uhn.fhir.rest.method;
*/
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.hamcrest.Matchers.emptyCollectionOf;
import java.lang.reflect.Method;
import java.util.HashSet;
@ -93,7 +94,8 @@ public class PageMethodBinding extends BaseResourceReturningMethodBinding {
IBundleProvider resultList = pagingProvider.retrieveResultList(thePagingAction);
if (resultList == null) {
ourLog.info("Client requested unknown paging ID[{}]", thePagingAction);
throw new ResourceGoneException("Search ID[" + thePagingAction + "] does not exist and may have expired.");
String msg = getContext().getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", thePagingAction);
throw new ResourceGoneException(msg);
}
Integer count = RestfulServerUtils.extractCountParameter(theRequest);

View File

@ -30,6 +30,8 @@ ca.uhn.fhir.rest.method.OperationParameter.urlParamNotPrimitive=Can not invoke o
ca.uhn.fhir.rest.method.IncludeParameter.invalidIncludeNameInRequest=Invalid {2} parameter value: "{0}". Valid values are: {1}
ca.uhn.fhir.rest.method.IncludeParameter.orIncludeInRequest='OR' query parameters (values containing ',') are not supported in _include parameters
ca.uhn.fhir.rest.method.PageMethodBinding.unknownSearchId=Search ID "{0}" does not exist and may have expired
ca.uhn.fhir.rest.method.SearchMethodBinding.invalidSpecialParamName=Method [{0}] in provider [{1}] contains search parameter annotated to use name [{2}] - This name is reserved according to the FHIR specification and can not be used as a search parameter name.
ca.uhn.fhir.rest.method.SearchMethodBinding.idWithoutCompartment=Method [{0}] in provider [{1}] has an @IdParam parameter. This is only allowable for compartment search (e.g. @Search(compartment="foo") )
ca.uhn.fhir.rest.method.SearchMethodBinding.idNullForCompartmentSearch=ID parameter can not be null or empty for compartment search

View File

@ -37,9 +37,7 @@ import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.IStaleSearchDeletingSvc;
import ca.uhn.fhir.jpa.search.StaleSearchDeletingSvcImpl;
import ca.uhn.fhir.jpa.search.*;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.sp.SearchParamPresenceSvcImpl;
@ -48,7 +46,6 @@ import ca.uhn.fhir.jpa.sp.SearchParamPresenceSvcImpl;
@EnableJpaRepositories(basePackages = "ca.uhn.fhir.jpa.dao.data")
public class BaseConfig implements SchedulingConfigurer {
@Resource
private ApplicationContext myAppCtx;
@ -73,6 +70,11 @@ public class BaseConfig implements SchedulingConfigurer {
return b;
}
@Bean(autowire=Autowire.BY_TYPE)
public ISearchCoordinatorSvc searchCoordinatorSvc() {
return new SearchCoordinatorSvcImpl();
}
@Bean
public ISearchParamPresenceSvc searchParamPresenceSvc() {
return new SearchParamPresenceSvcImpl();

View File

@ -78,6 +78,7 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.jpa.dao.data.*;
import ca.uhn.fhir.jpa.entity.*;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.term.IHapiTerminologySvc;
@ -186,6 +187,9 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao {
private Map<Class<? extends IBaseResource>, IFhirResourceDao<?>> myResourceTypeToDao;
@Autowired
protected ISearchCoordinatorSvc mySearchCoordinatorSvc;
@Autowired
private ISearchDao mySearchDao;
@ -509,6 +513,56 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao {
}
}
@SuppressWarnings("unchecked")
private <T extends BaseResourceIndexedSearchParam> void findMissingSearchParams(ResourceTable theEntity, Set<Entry<String, RuntimeSearchParam>> activeSearchParams, RestSearchParameterTypeEnum type,
Set<T> paramCollection) {
for (Entry<String, RuntimeSearchParam> nextEntry : activeSearchParams) {
String nextParamName = nextEntry.getKey();
if (nextEntry.getValue().getParamType() == type) {
boolean haveParam = false;
for (BaseResourceIndexedSearchParam nextParam : paramCollection) {
if (nextParam.getParamName().equals(nextParamName)) {
haveParam = true;
break;
}
}
if (!haveParam) {
BaseResourceIndexedSearchParam param;
switch (type) {
case DATE:
param = new ResourceIndexedSearchParamDate();
break;
case NUMBER:
param = new ResourceIndexedSearchParamNumber();
break;
case QUANTITY:
param = new ResourceIndexedSearchParamQuantity();
break;
case STRING:
param = new ResourceIndexedSearchParamString();
break;
case TOKEN:
param = new ResourceIndexedSearchParamToken();
break;
case URI:
param = new ResourceIndexedSearchParamUri();
break;
case COMPOSITE:
case HAS:
case REFERENCE:
default:
continue;
}
param.setResource(theEntity);
param.setMissing(true);
param.setParamName(nextParamName);
paramCollection.add((T) param);
}
}
}
}
protected DaoConfig getConfig() {
return myConfig;
}
@ -672,7 +726,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao {
theProvider.setEntityManager(myEntityManager);
theProvider.setPlatformTransactionManager(myPlatformTransactionManager);
theProvider.setSearchDao(mySearchDao);
theProvider.setSearchResultDao(mySearchResultDao);
theProvider.setSearchCoordinatorSvc(mySearchCoordinatorSvc);
}
protected boolean isLogicalReference(IIdType theId) {
@ -1522,54 +1576,8 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao {
return theEntity;
}
@SuppressWarnings("unchecked")
private <T extends BaseResourceIndexedSearchParam> void findMissingSearchParams(ResourceTable theEntity, Set<Entry<String, RuntimeSearchParam>> activeSearchParams, RestSearchParameterTypeEnum type,
Set<T> paramCollection) {
for (Entry<String, RuntimeSearchParam> nextEntry : activeSearchParams) {
String nextParamName = nextEntry.getKey();
if (nextEntry.getValue().getParamType() == type) {
boolean haveParam = false;
for (BaseResourceIndexedSearchParam nextParam : paramCollection) {
if (nextParam.getParamName().equals(nextParamName)) {
haveParam = true;
break;
}
}
if (!haveParam) {
BaseResourceIndexedSearchParam param;
switch (type) {
case DATE:
param = new ResourceIndexedSearchParamDate();
break;
case NUMBER:
param = new ResourceIndexedSearchParamNumber();
break;
case QUANTITY:
param = new ResourceIndexedSearchParamQuantity();
break;
case STRING:
param = new ResourceIndexedSearchParamString();
break;
case TOKEN:
param = new ResourceIndexedSearchParamToken();
break;
case URI:
param = new ResourceIndexedSearchParamUri();
break;
case COMPOSITE:
case HAS:
case REFERENCE:
default:
continue;
}
param.setResource(theEntity);
param.setMissing(true);
param.setParamName(nextParamName);
paramCollection.add((T) param);
}
}
}
protected ResourceTable updateEntity(IBaseResource theResource, ResourceTable entity, Date theDeletedTimestampOrNull, Date theUpdateTime) {
return updateEntity(theResource, entity, theDeletedTimestampOrNull, true, true, theUpdateTime);
}
private void updateSearchParamPresent(Map<String, Boolean> presentSearchParams, Set<? extends BaseResourceIndexedSearchParam> params) {
@ -1578,10 +1586,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao {
}
}
protected ResourceTable updateEntity(IBaseResource theResource, ResourceTable entity, Date theDeletedTimestampOrNull, Date theUpdateTime) {
return updateEntity(theResource, entity, theDeletedTimestampOrNull, true, true, theUpdateTime);
}
private void validateChildReferences(IBase theElement, String thePath) {
if (theElement == null) {
return;

View File

@ -755,7 +755,6 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
* Subclasses may override to provide behaviour. Invoked within a delete
* transaction with the resource that is about to be deleted.
*/
@SuppressWarnings("unused")
protected void preDelete(T theResourceToDelete, ResourceTable theEntityToDelete) {
// nothing by default
}
@ -955,9 +954,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
notifyInterceptors(RestOperationTypeEnum.SEARCH_TYPE, requestDetails);
}
SearchBuilder builder = newSearchBuilder();
builder.setType(getResourceType(), getResourceName());
return builder.search(theParams);
return mySearchCoordinatorSvc.registerSearch(this, theParams, getResourceName());
}

View File

@ -26,7 +26,6 @@ import javax.servlet.http.HttpServletRequest;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.springframework.beans.factory.annotation.Autowired;
import ca.uhn.fhir.jpa.dao.SearchParameterMap.EverythingModeEnum;
import ca.uhn.fhir.model.api.IResource;
@ -47,9 +46,6 @@ public class FhirResourceDaoPatientDstu2 extends FhirResourceDaoDstu2<Patient>im
super();
}
@Autowired
private ISearchParamRegistry mySerarchParamRegistry;
private IBundleProvider doEverythingOperation(IIdType theId, IPrimitiveType<Integer> theCount, DateRangeParam theLastUpdated, SortSpec theSort, StringAndListParam theContent, StringAndListParam theNarrative) {
SearchParameterMap paramMap = new SearchParameterMap();
if (theCount != null) {
@ -69,9 +65,7 @@ public class FhirResourceDaoPatientDstu2 extends FhirResourceDaoDstu2<Patient>im
paramMap.add("_id", new StringParam(theId.getIdPart()));
}
SearchBuilder builder = new SearchBuilder(getContext(), myEntityManager, myPlatformTransactionManager, mySearchDao, mySearchResultDao, this, myResourceIndexedSearchParamUriDao, myForcedIdDao, myTerminologySvc, mySerarchParamRegistry);
builder.setType(getResourceType(), getResourceName());
return builder.search(paramMap);
return mySearchCoordinatorSvc.registerSearch(this, paramMap, getResourceName());
}
@Override

View File

@ -73,7 +73,7 @@ public interface IDao {
*/
void injectDependenciesIntoBundleProvider(PersistedJpaBundleProvider theProvider);
SearchBuilder newSearchBuilder();
ISearchBuilder newSearchBuilder();
void populateFullTextFields(IBaseResource theResource, ResourceTable theEntity);

View File

@ -0,0 +1,28 @@
package ca.uhn.fhir.jpa.dao;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.persistence.EntityManager;
import org.hl7.fhir.instance.model.api.IBaseResource;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.rest.param.DateRangeParam;
public interface ISearchBuilder {
Iterator<Long> createQuery(SearchParameterMap theParams);
void setType(Class<? extends IBaseResource> theResourceType, String theResourceName);
void loadResourcesByPid(Collection<Long> theIncludePids, List<IBaseResource> theResourceListToPopulate, Set<Long> theRevIncludedPids, boolean theForHistoryOperation, EntityManager theEntityManager,
FhirContext theContext, IDao theDao);
Set<Long> loadReverseIncludes(IDao theCallingDao, FhirContext theContext, EntityManager theEntityManager, Collection<Long> theMatches, Set<Include> theRevIncludes, boolean theReverseMode,
DateRangeParam theLastUpdated);
}

View File

@ -69,9 +69,7 @@ public class FhirResourceDaoPatientDstu3 extends FhirResourceDaoDstu3<Patient>im
paramMap.add("_id", new StringParam(theId.getIdPart()));
}
SearchBuilder builder = new SearchBuilder(getContext(), myEntityManager, myPlatformTransactionManager, mySearchDao, mySearchResultDao, this, myResourceIndexedSearchParamUriDao, myForcedIdDao, myTerminologySvc, mySerarchParamRegistry);
builder.setType(getResourceType(), getResourceName());
return builder.search(paramMap);
return mySearchCoordinatorSvc.registerSearch(this, paramMap, getResourceName());
}
@Override

View File

@ -1,5 +1,7 @@
package ca.uhn.fhir.jpa.entity;
import static org.apache.commons.lang3.StringUtils.left;
/*
* #%L
* HAPI FHIR JPA Server
@ -44,12 +46,17 @@ import ca.uhn.fhir.rest.param.DateRangeParam;
//@formatter:on
public class Search implements Serializable {
private static final int FAILURE_MESSAGE_LENGTH = 500;
private static final long serialVersionUID = 1L;
@Temporal(TemporalType.TIMESTAMP)
@Column(name="CREATED", nullable=false, updatable=false)
private Date myCreated;
@Column(name="FAILURE_MESSAGE", length=FAILURE_MESSAGE_LENGTH, nullable=true)
private String myFailureMessage;
@Id
@GeneratedValue(strategy = GenerationType.AUTO, generator="SEQ_SEARCH")
@SequenceGenerator(name="SEQ_SEARCH", sequenceName="SEQ_SEARCH")
@ -67,6 +74,9 @@ public class Search implements Serializable {
@Column(name="LAST_UPDATED_LOW", nullable=true, insertable=true, updatable=false)
private Date myLastUpdatedLow;
@Column(name="NUM_FOUND", nullable=false)
private int myNumFound;
@Column(name="PREFERRED_PAGE_SIZE", nullable=true)
private Integer myPreferredPageSize;
@ -86,7 +96,11 @@ public class Search implements Serializable {
@Column(name="SEARCH_TYPE", nullable=false)
private SearchTypeEnum mySearchType;
@Column(name="TOTAL_COUNT", nullable=false)
@Enumerated(EnumType.STRING)
@Column(name="SEARCH_STATUS", nullable=false, length=10)
private SearchStatusEnum myStatus;
@Column(name="TOTAL_COUNT", nullable=true)
private Integer myTotalCount;
@Column(name="SEARCH_UUID", length=40, nullable=false, updatable=false)
@ -96,6 +110,10 @@ public class Search implements Serializable {
return myCreated;
}
public String getFailureMessage() {
return myFailureMessage;
}
public Long getId() {
return myId;
}
@ -123,6 +141,10 @@ public class Search implements Serializable {
return myLastUpdatedLow;
}
public int getNumFound() {
return myNumFound;
}
public Integer getPreferredPageSize() {
return myPreferredPageSize;
}
@ -139,11 +161,14 @@ public class Search implements Serializable {
return mySearchType;
}
public SearchStatusEnum getStatus() {
return myStatus;
}
public Integer getTotalCount() {
return myTotalCount;
}
public String getUuid() {
return myUuid;
}
@ -152,10 +177,16 @@ public class Search implements Serializable {
myCreated = theCreated;
}
public void setFailureMessage(String theFailureMessage) {
myFailureMessage = left(theFailureMessage, FAILURE_MESSAGE_LENGTH);
}
public void setLastUpdated(Date theLowerBound, Date theUpperBound) {
myLastUpdatedLow = theLowerBound;
myLastUpdatedHigh = theUpperBound;
}
public void setLastUpdated(DateRangeParam theLastUpdated) {
if (theLastUpdated == null) {
myLastUpdatedLow = null;
@ -165,6 +196,9 @@ public class Search implements Serializable {
myLastUpdatedHigh = theLastUpdated.getUpperBoundAsInstant();
}
}
public void setNumFound(int theNumFound) {
myNumFound = theNumFound;
}
public void setPreferredPageSize(Integer thePreferredPageSize) {
myPreferredPageSize = thePreferredPageSize;
@ -183,6 +217,10 @@ public class Search implements Serializable {
mySearchType = theSearchType;
}
public void setStatus(SearchStatusEnum theStatus) {
myStatus = theStatus;
}
public void setTotalCount(Integer theTotalCount) {
myTotalCount = theTotalCount;
}

View File

@ -106,7 +106,8 @@ public class SearchResult implements Serializable {
myOrder = theOrder;
}
public void setResourcePid(Long theResourcePid) {
public SearchResult setResourcePid(Long theResourcePid) {
myResourcePid = theResourcePid;
return this;
}
}

View File

@ -0,0 +1,9 @@
package ca.uhn.fhir.jpa.entity;
public enum SearchStatusEnum {
LOADING,
FINISHED,
FAILED
}

View File

@ -38,16 +38,23 @@ import ca.uhn.fhir.rest.server.IPagingProvider;
public class DatabaseBackedPagingProvider extends BasePagingProvider implements IPagingProvider {
@Autowired
private PlatformTransactionManager myPlatformTransactionManager;
@Autowired
private ISearchResultDao mySearchResultDao;
@Autowired
private EntityManager myEntityManager;
@Autowired
private FhirContext myContext;
@Autowired
private IFhirSystemDao<?, ?> myDao;
@Autowired
private EntityManager myEntityManager;
@Autowired
private PlatformTransactionManager myPlatformTransactionManager;
@Autowired
private ISearchResultDao mySearchResultDao;
/**
* Constructor
*/
public DatabaseBackedPagingProvider() {
super();
}
/**
* Constructor
@ -58,13 +65,6 @@ public class DatabaseBackedPagingProvider extends BasePagingProvider implements
this();
}
/**
* Constructor
*/
public DatabaseBackedPagingProvider() {
super();
}
@Override
public synchronized IBundleProvider retrieveResultList(String theId) {
PersistedJpaBundleProvider provider = new PersistedJpaBundleProvider(theId, myDao);

View File

@ -1,5 +1,17 @@
package ca.uhn.fhir.jpa.search;
public class ISearchCoordinatorSvc {
import java.util.List;
import org.hl7.fhir.instance.model.api.IBaseResource;
import ca.uhn.fhir.jpa.dao.IDao;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.rest.server.IBundleProvider;
public interface ISearchCoordinatorSvc {
List<Long> getResources(String theUuid, int theFrom, int theTo);
IBundleProvider registerSearch(IDao theCallingDao, SearchParameterMap theParams, String theResourceType);
}

View File

@ -10,7 +10,7 @@ package ca.uhn.fhir.jpa.search;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -29,9 +29,7 @@ import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import org.apache.commons.lang3.SerializationUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.transaction.PlatformTransactionManager;
@ -41,23 +39,24 @@ import org.springframework.transaction.support.TransactionTemplate;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.IDao;
import ca.uhn.fhir.jpa.dao.SearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
import ca.uhn.fhir.jpa.entity.*;
import ca.uhn.fhir.jpa.entity.BaseHasResource;
import ca.uhn.fhir.jpa.entity.ResourceHistoryTable;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
import ca.uhn.fhir.model.primitive.InstantDt;
import ca.uhn.fhir.rest.server.IBundleProvider;
public final class PersistedJpaBundleProvider implements IBundleProvider {
public class PersistedJpaBundleProvider implements IBundleProvider {
private FhirContext myContext;
private IDao myDao;
private EntityManager myEntityManager;
private PlatformTransactionManager myPlatformTransactionManager;
private ISearchCoordinatorSvc mySearchCoordinatorSvc;
private ISearchDao mySearchDao;
private Search mySearchEntity;
private ISearchResultDao mySearchResultDao;
private String myUuid;
public PersistedJpaBundleProvider(String theSearchUuid, IDao theDao) {
@ -115,27 +114,21 @@ public final class PersistedJpaBundleProvider implements IBundleProvider {
}
protected List<IBaseResource> doSearchOrEverythingInTransaction(final int theFromIndex, final int theToIndex) {
SearchBuilder sb = myDao.newSearchBuilder();
ISearchBuilder sb = myDao.newSearchBuilder();
String resourceName = mySearchEntity.getResourceType();
Class<? extends IBaseResource> resourceType = myContext.getResourceDefinition(resourceName).getImplementingClass();
sb.setType(resourceType, resourceName);
// SearchParameterMap parameterMap = SerializationUtils.deserialize(mySearchEntity.getSearchParamMap());
// List<Long> pidsSubList = sb.loadSearchPage(parameterMap, theFromIndex, theToIndex);());
List<Long> pidsSubList = null;
List<Long> pidsSubList = mySearchCoordinatorSvc.getResources(myUuid, theFromIndex, theToIndex);
Set<Long> includedPids = new HashSet<Long>();
if (mySearchEntity.getSearchType() == SearchTypeEnum.SEARCH) {
includedPids.addAll(SearchBuilder.loadReverseIncludes(myDao, myContext, myEntityManager, pidsSubList, mySearchEntity.toRevIncludesList(), true, mySearchEntity.getLastUpdated()));
return toResourceList(sb, pidsSubList);
}
private void ensureDependenciesInjected() {
if (myPlatformTransactionManager == null) {
myDao.injectDependenciesIntoBundleProvider(this);
}
includedPids.addAll(SearchBuilder.loadReverseIncludes(myDao, myContext, myEntityManager, pidsSubList, mySearchEntity.toIncludesList(), false, mySearchEntity.getLastUpdated()));
// Execute the query and make sure we return distinct results
List<IBaseResource> resources = new ArrayList<IBaseResource>();
SearchBuilder.loadResourcesByPid(pidsSubList, resources, includedPids, false, myEntityManager, myContext, myDao);
return resources;
}
/**
@ -151,7 +144,7 @@ public final class PersistedJpaBundleProvider implements IBundleProvider {
@Override
public Boolean doInTransaction(TransactionStatus theStatus) {
try {
mySearchEntity = mySearchDao.findByUuid(myUuid);
setSearchEntity(mySearchDao.findByUuid(myUuid));
if (mySearchEntity == null) {
return false;
@ -170,12 +163,6 @@ public final class PersistedJpaBundleProvider implements IBundleProvider {
return true;
}
private void ensureDependenciesInjected() {
if (myPlatformTransactionManager == null) {
myDao.injectDependenciesIntoBundleProvider(this);
}
}
@Override
public InstantDt getPublished() {
ensureSearchEntityLoaded();
@ -228,36 +215,40 @@ public final class PersistedJpaBundleProvider implements IBundleProvider {
myPlatformTransactionManager = thePlatformTransactionManager;
}
public void setSearchCoordinatorSvc(ISearchCoordinatorSvc theSearchCoordinatorSvc) {
mySearchCoordinatorSvc = theSearchCoordinatorSvc;
}
public void setSearchDao(ISearchDao theSearchDao) {
mySearchDao = theSearchDao;
}
public void setSearchResultDao(ISearchResultDao theSearchResultDao) {
mySearchResultDao = theSearchResultDao;
protected void setSearchEntity(Search theSearchEntity) {
mySearchEntity = theSearchEntity;
}
@Override
public Integer size() {
ensureSearchEntityLoaded();
return Math.max(0, mySearchEntity.getTotalCount());
}
static Pageable toPage(final int theFromIndex, int theToIndex) {
int pageSize = theToIndex - theFromIndex;
if (pageSize < 1) {
Integer size = mySearchEntity.getTotalCount();
if (size == null) {
return null;
}
int pageIndex = theFromIndex / pageSize;
Pageable page = new PageRequest(pageIndex, pageSize) {
private static final long serialVersionUID = 1L;
@Override
public int getOffset() {
return theFromIndex;
}};
return page;
return Math.max(0, size);
}
protected List<IBaseResource> toResourceList(ISearchBuilder sb, List<Long> pidsSubList) {
Set<Long> includedPids = new HashSet<Long>();
if (mySearchEntity.getSearchType() == SearchTypeEnum.SEARCH) {
includedPids.addAll(sb.loadReverseIncludes(myDao, myContext, myEntityManager, pidsSubList, mySearchEntity.toRevIncludesList(), true, mySearchEntity.getLastUpdated()));
}
includedPids.addAll(sb.loadReverseIncludes(myDao, myContext, myEntityManager, pidsSubList, mySearchEntity.toIncludesList(), false, mySearchEntity.getLastUpdated()));
// Execute the query and make sure we return distinct results
List<IBaseResource> resources = new ArrayList<IBaseResource>();
sb.loadResourcesByPid(pidsSubList, resources, includedPids, false, myEntityManager, myContext, myDao);
return resources;
}
}

View File

@ -0,0 +1,49 @@
package ca.uhn.fhir.jpa.search;
import java.util.List;
import org.hl7.fhir.instance.model.api.IBaseResource;
import ca.uhn.fhir.jpa.dao.IDao;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchStatusEnum;
import ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl.SearchTask;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
public class PersistedJpaSearchFirstPageBundleProvider extends PersistedJpaBundleProvider {
private SearchTask mySearchTask;
private ISearchBuilder mySearchBuilder;
private Search mySearch;
public PersistedJpaSearchFirstPageBundleProvider(Search theSearch, IDao theDao, SearchTask theSearchTask, ISearchBuilder theSearchBuilder) {
super(theSearch.getUuid(), theDao);
setSearchEntity(theSearch);
mySearchTask = theSearchTask;
mySearchBuilder = theSearchBuilder;
mySearch = theSearch;
}
@Override
public List<IBaseResource> getResources(int theFromIndex, int theToIndex) {
checkForFailedSearch();
List<Long> pids = mySearchTask.getResourcePids(theFromIndex, theToIndex);
return toResourceList(mySearchBuilder, pids);
}
@Override
public Integer size() {
mySearchTask.awaitInitialSync();
checkForFailedSearch();
return super.size();
}
private void checkForFailedSearch() {
if (mySearch.getStatus() == SearchStatusEnum.FAILED) {
throw new InternalErrorException("Failure while loading search results");
}
}
}

View File

@ -1,40 +1,134 @@
package ca.uhn.fhir.jpa.search;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.*;
import java.util.concurrent.*;
import javax.persistence.EntityManager;
import javax.transaction.Transactional.TxType;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.hibernate.id.ResultSetIdentifierConsumer;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.web.servlet.ThemeResolver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.IDao;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
import ca.uhn.fhir.jpa.dao.SearchBuilder;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.dao.data.IResourceIndexedSearchParamUriDao;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
import ca.uhn.fhir.jpa.entity.*;
import ca.uhn.fhir.jpa.util.StopWatch;
import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.rest.method.PageMethodBinding;
import ca.uhn.fhir.rest.server.IBundleProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
public class SearchCoordinatorSvcImpl {
public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
@Autowired
private FhirContext myContext;
@Autowired
private EntityManager myEntityManager;
private ExecutorService myExecutor;
private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<String, SearchTask>();
private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
private final List<CountDownLatch> myResultSizeLatch = new ArrayList<CountDownLatch>();
@Autowired
private ISearchDao mySearchDao;
@Autowired
private ISearchIncludeDao mySearchIncludeDao;
@Autowired
private ISearchResultDao mySearchResultDao;
@Autowired
private PlatformTransactionManager myTxManager;
/**
* Constructor
*/
public SearchCoordinatorSvcImpl() {
CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("search_coord_");
myExecutor = Executors.newCachedThreadPool(threadFactory);
}
public IBundleProvider registerSearch(IDao theCallingDao, SearchParameterMap theParams) {
@Override
public List<Long> getResources(String theUuid, int theFrom, int theTo) {
SearchTask task = myIdToSearchTask.get(theUuid);
if (task != null) {
return task.getResourcePids(theFrom, theTo);
}
Search search;
StopWatch sw = new StopWatch();
while (true) {
search = mySearchDao.findByUuid(theUuid);
if (search == null) {
ourLog.info("Client requested unknown paging ID[{}]", theUuid);
String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid);
throw new ResourceGoneException(msg);
}
verifySearchHasntFailedOrThrowInternalErrorException(search);
if (search.getStatus() == SearchStatusEnum.FINISHED) {
break;
}
if (search.getNumFound() >= theTo) {
break;
}
if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) {
throw new InternalErrorException("Request timed out after " + sw.getMillis() + "ms");
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore
}
}
Pageable page = toPage(theFrom, theTo);
if (page == null) {
return Collections.emptyList();
}
Page<SearchResult> searchResults = mySearchResultDao.findWithSearchUuid(search, page);
List<Long> retVal = new ArrayList<Long>();
for (SearchResult next : searchResults) {
retVal.add(next.getResourcePid());
}
return retVal;
}
@Override
public IBundleProvider registerSearch(IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
StopWatch w = new StopWatch();
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(theResourceType).getImplementingClass();
ISearchBuilder sb = theCallingDao.newSearchBuilder();
sb.setType(resourceTypeClass, theResourceType);
Iterator<Long> resultIter = sb.createQuery(theParams);
if (theParams.isLoadSynchronous()) {
SearchBuilder sb = theCallingDao.newSearchBuilder();
Iterator<Long> resultIter = sb.createQuery(theParams);
// Load the results synchronously
List<Long> pids = new ArrayList<Long>();
@ -45,39 +139,308 @@ public class SearchCoordinatorSvcImpl {
}
}
resources = sb.loadResourcesByPid(pids, theResourceListToPopulate, theRevIncludedPids, theForHistoryOperation, entityManager, context, theDao);
/*
* For synchronous queries, we load all the includes right away
* since we're returning a static bundle with all the results
* pre-loaded. This is ok because syncronous requests are not
* expected to be paged
*
* On the other hand for async queries we load includes/revincludes
* individually for pages as we return them to clients
*/
Set<Long> includedPids = new HashSet<Long>();
includedPids.addAll(sb.loadReverseIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated()));
includedPids.addAll(sb.loadReverseIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated()));
// Execute the query and make sure we return distinct results
List<IBaseResource> resources = new ArrayList<IBaseResource>();
sb.loadResourcesByPid(pids, resources, includedPids, false, myEntityManager, myContext, theCallingDao);
return new SimpleBundleProvider(resources);
}
mySearchEntity = new Search();
mySearchEntity.setUuid(UUID.randomUUID().toString());
mySearchEntity.setCreated(new Date());
mySearchEntity.setTotalCount(-1);
mySearchEntity.setPreferredPageSize(myParams.getCount());
mySearchEntity.setSearchType(myParams.getEverythingMode() != null ? SearchTypeEnum.EVERYTHING : SearchTypeEnum.SEARCH);
mySearchEntity.setLastUpdated(myParams.getLastUpdated());
mySearchEntity.setResourceType(myResourceName);
Search search = new Search();
search.setUuid(UUID.randomUUID().toString());
search.setCreated(new Date());
search.setTotalCount(null);
search.setNumFound(0);
search.setPreferredPageSize(theParams.getCount());
search.setSearchType(theParams.getEverythingMode() != null ? SearchTypeEnum.EVERYTHING : SearchTypeEnum.SEARCH);
search.setLastUpdated(theParams.getLastUpdated());
search.setResourceType(theResourceType);
search.setStatus(SearchStatusEnum.LOADING);
for (Include next : myParams.getIncludes()) {
mySearchEntity.getIncludes().add(new SearchInclude(mySearchEntity, next.getValue(), false, next.isRecurse()));
for (Include next : theParams.getIncludes()) {
search.getIncludes().add(new SearchInclude(search, next.getValue(), false, next.isRecurse()));
}
for (Include next : myParams.getRevIncludes()) {
mySearchEntity.getIncludes().add(new SearchInclude(mySearchEntity, next.getValue(), true, next.isRecurse()));
for (Include next : theParams.getRevIncludes()) {
search.getIncludes().add(new SearchInclude(search, next.getValue(), true, next.isRecurse()));
}
List<Long> firstPage = loadSearchPage(theParams, 0, 999);
mySearchEntity.setTotalCount(firstPage.size());
SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType);
myIdToSearchTask.put(search.getUuid(), task);
myExecutor.submit(task);
myEntityManager.persist(mySearchEntity);
for (SearchInclude next : mySearchEntity.getIncludes()) {
myEntityManager.persist(next);
}
IBundleProvider retVal = doReturnProvider();
PersistedJpaSearchFirstPageBundleProvider retVal = new PersistedJpaSearchFirstPageBundleProvider(search, theCallingDao, task, sb);
retVal.setContext(myContext);
retVal.setEntityManager(myEntityManager);
retVal.setPlatformTransactionManager(myTxManager);
retVal.setSearchDao(mySearchDao);
retVal.setSearchCoordinatorSvc(this);
ourLog.info("Search initial phase completed in {}ms", w);
return retVal;
}
@VisibleForTesting
void setContextForUnitTest(FhirContext theCtx) {
myContext = theCtx;
}
@VisibleForTesting
void setEntityManagerForUnitTest(EntityManager theEntityManager) {
myEntityManager = theEntityManager;
}
@VisibleForTesting
void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) {
myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults;
}
@VisibleForTesting
void setSearchDaoForUnitTest(ISearchDao theSearchDao) {
mySearchDao = theSearchDao;
}
@VisibleForTesting
void setSearchDaoIncludeForUnitTest(ISearchIncludeDao theSearchIncludeDao) {
mySearchIncludeDao = theSearchIncludeDao;
}
@VisibleForTesting
void setSearchDaoResultForUnitTest(ISearchResultDao theSearchResultDao) {
mySearchResultDao = theSearchResultDao;
}
@VisibleForTesting
void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) {
myTxManager = theTxManager;
}
static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) {
if (theSearch.getStatus() == SearchStatusEnum.FAILED) {
throw new InternalErrorException(theSearch.getFailureMessage());
}
}
public class SearchTask implements Callable<Void> {
private static final int SYNC_SIZE = 250;
private int myCountSaved = 0;
private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
private final Search mySearch;
private final ArrayList<Long> mySyncedPids = new ArrayList<Long>();
private final ArrayList<Long> myUnsyncedPids = new ArrayList<Long>();
private final IDao myCallingDao;
private SearchParameterMap myParams;
private String myResourceType;
public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
mySearch = theSearch;
myCallingDao = theCallingDao;
myParams = theParams;
myResourceType = theResourceType;
}
public void awaitInitialSync() {
ourLog.info("Awaiting initial sync");
do {
try {
if (myInitialCollectionLatch.await(250, TimeUnit.MILLISECONDS)) {
break;
}
} catch (InterruptedException e) {
throw new InternalErrorException(e);
}
} while (mySearch.getStatus() == SearchStatusEnum.LOADING);
ourLog.info("Initial sync completed");
}
@Override
public Void call() throws Exception {
try {
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theStatus) {
doSearch();
}
});
} catch (Throwable t) {
ourLog.error("Failed during search loading", t);
myUnsyncedPids.clear();
mySearch.setStatus(SearchStatusEnum.FAILED);
String failureMessage = ExceptionUtils.getRootCauseMessage(t);
mySearch.setFailureMessage(failureMessage);
saveSearch();
}
myIdToSearchTask.remove(mySearch.getUuid());
return null;
}
private void doSearch() {
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
ISearchBuilder sb = myCallingDao.newSearchBuilder();
sb.setType(resourceTypeClass, myResourceType);
Iterator<Long> theResultIter = sb.createQuery(myParams);
while (theResultIter.hasNext()) {
myUnsyncedPids.add(theResultIter.next());
if (myUnsyncedPids.size() >= SYNC_SIZE) {
saveUnsynced(theResultIter);
}
}
saveUnsynced(theResultIter);
}
private void doSaveSearch() {
if (mySearch.getId() == null) {
mySearchDao.save(mySearch);
for (SearchInclude next : mySearch.getIncludes()) {
mySearchIncludeDao.save(next);
}
} else {
mySearchDao.save(mySearch);
}
}
public List<Long> getResourcePids(int theFromIndex, int theToIndex) {
ourLog.info("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
CountDownLatch latch = null;
synchronized (mySyncedPids) {
if (mySyncedPids.size() < theToIndex && mySearch.getStatus() == SearchStatusEnum.LOADING) {
int latchSize = theToIndex - mySyncedPids.size();
ourLog.info("Registering latch to await {} results (want {} total)", latchSize, theToIndex);
latch = new CountDownLatch(latchSize);
myResultSizeLatch.add(latch);
}
}
if (latch != null) {
while (latch.getCount() > 0 && mySearch.getStatus() == SearchStatusEnum.LOADING) {
try {
ourLog.trace("Awaiting latch with {}", latch.getCount());
latch.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ok
}
}
}
ArrayList<Long> retVal = new ArrayList<Long>();
synchronized (mySyncedPids) {
verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
int toIndex = theToIndex;
if (mySyncedPids.size() < toIndex) {
toIndex = mySyncedPids.size();
}
for (int i = theFromIndex; i < toIndex; i++) {
retVal.add(mySyncedPids.get(i));
}
}
return retVal;
}
private void saveSearch() {
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
doSaveSearch();
}
});
}
private void saveUnsynced(final Iterator<Long> theResultIter) {
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
if (mySearch.getId() == null) {
doSaveSearch();
}
List<SearchResult> resultsToSave = Lists.newArrayList();
for (Long nextPid : myUnsyncedPids) {
SearchResult nextResult = new SearchResult(mySearch);
nextResult.setResourcePid(nextPid);
nextResult.setOrder(myCountSaved++);
resultsToSave.add(nextResult);
}
mySearchResultDao.save(resultsToSave);
synchronized (mySyncedPids) {
int numSyncedThisPass = myUnsyncedPids.size();
ourLog.info("Syncing {} search results", numSyncedThisPass);
mySyncedPids.addAll(myUnsyncedPids);
myUnsyncedPids.clear();
if (theResultIter.hasNext() == false) {
mySearch.setStatus(SearchStatusEnum.FINISHED);
mySearch.setTotalCount(myCountSaved);
for (CountDownLatch next : myResultSizeLatch) {
while (next.getCount() > 0) {
next.countDown();
}
}
} else {
if (myResultSizeLatch.isEmpty() == false) {
for (CountDownLatch next : myResultSizeLatch) {
for (int i = 0; i < numSyncedThisPass; i++) {
next.countDown();
}
}
}
}
}
mySearch.setNumFound(myCountSaved);
doSaveSearch();
}
});
myInitialCollectionLatch.countDown();
}
}
static Pageable toPage(final int theFromIndex, int theToIndex) {
int pageSize = theToIndex - theFromIndex;
if (pageSize < 1) {
return null;
}
int pageIndex = theFromIndex / pageSize;
Pageable page = new PageRequest(pageIndex, pageSize) {
private static final long serialVersionUID = 1L;
@Override
public int getOffset() {
return theFromIndex;
}
};
return page;
}
}

View File

@ -1,19 +0,0 @@
package ca.uhn.fhir.jpa.search;
import static org.junit.Assert.*;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.data.domain.Pageable;
public class PersistedJpaBundleProviderTest {
@Test
@Ignore
public void testGetPage() {
Pageable page = PersistedJpaBundleProvider.toPage(50, 73);
assertEquals(50, page.getOffset());
// assertEquals(50, page.get);
}
}

View File

@ -0,0 +1,468 @@
package ca.uhn.fhir.jpa.search;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.*;
import javax.persistence.EntityManager;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.transaction.PlatformTransactionManager;
import com.google.common.collect.Lists;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.IDao;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchResult;
import ca.uhn.fhir.jpa.entity.SearchStatusEnum;
import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
import ca.uhn.fhir.model.dstu2.resource.Patient;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.server.IBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.TestUtil;
@SuppressWarnings({ "unchecked" })
@RunWith(MockitoJUnitRunner.class)
public class SearchCoordinatorSvcImplTest {
private static FhirContext ourCtx = FhirContext.forDstu3();
@Mock
private IDao myCallingDao;
@Mock
private EntityManager myEntityManager;
private int myExpectedNumberOfSearchBuildersCreated = 1;
@Mock
private ISearchBuilder mySearchBuider;
@Mock
private ISearchDao mySearchDao;
@Mock
private ISearchIncludeDao mySearchIncludeDao;
@Mock
private ISearchResultDao mySearchResultDao;
@Captor
ArgumentCaptor<Iterable<SearchResult>> mySearchResultIterCaptor;
private SearchCoordinatorSvcImpl mySvc;
@Mock
private PlatformTransactionManager myTxManager;
@After
public void after() {
verify(myCallingDao, atMost(myExpectedNumberOfSearchBuildersCreated)).newSearchBuilder();
}
@Before
public void before() {
mySvc = new SearchCoordinatorSvcImpl();
mySvc.setEntityManagerForUnitTest(myEntityManager);
mySvc.setTransactionManagerForUnitTest(myTxManager);
mySvc.setContextForUnitTest(ourCtx);
mySvc.setSearchDaoForUnitTest(mySearchDao);
mySvc.setSearchDaoIncludeForUnitTest(mySearchIncludeDao);
mySvc.setSearchDaoResultForUnitTest(mySearchResultDao);
when(myCallingDao.newSearchBuilder()).thenReturn(mySearchBuider);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock theInvocation) throws Throwable {
PersistedJpaBundleProvider provider = (PersistedJpaBundleProvider) theInvocation.getArguments()[0];
provider.setSearchCoordinatorSvc(mySvc);
provider.setPlatformTransactionManager(myTxManager);
provider.setSearchDao(mySearchDao);
provider.setEntityManager(myEntityManager);
provider.setContext(ourCtx);
return null;
}}).when(myCallingDao).injectDependenciesIntoBundleProvider(any(PersistedJpaBundleProvider.class));
}
private List<Long> createPidSequence(int from, int to) {
List<Long> pids = new ArrayList<Long>();
for (long i = from; i < to; i++) {
pids.add(i);
}
return pids;
}
private Answer<Void> loadPids() {
Answer<Void> retVal = new Answer<Void>() {
@Override
public Void answer(InvocationOnMock theInvocation) throws Throwable {
List<Long> pids = (List<Long>) theInvocation.getArguments()[0];
List<IBaseResource> resources = (List<IBaseResource>) theInvocation.getArguments()[1];
for (Long nextPid : pids) {
Patient pt = new Patient();
pt.setId(nextPid.toString());
resources.add(pt);
}
return null;
}
};
return retVal;
}
@Test
public void testAsyncSearchFailDuringSearchSameCoordinator() {
SearchParameterMap params = new SearchParameterMap();
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 800);
Iterator<Long> iter = new FailAfterNIterator<Long>(new SlowIterator<Long>(pids.iterator(), 2), 300);
when(mySearchBuider.createQuery(Mockito.same(params))).thenReturn(iter);
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(any(List.class), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient");
assertNotNull(result.getUuid());
assertEquals(null, result.size());
try {
result.getResources(0, 100000);
} catch (InternalErrorException e) {
assertEquals("NullPointerException: FAILED", e.getMessage());
}
}
@Test
public void testAsyncSearchLargeResultSetBigCountSameCoordinator() {
SearchParameterMap params = new SearchParameterMap();
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 800);
Iterator<Long> iter = new SlowIterator<Long>(pids.iterator(), 2);
when(mySearchBuider.createQuery(Mockito.same(params))).thenReturn(iter);
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(any(List.class), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient");
assertNotNull(result.getUuid());
assertEquals(null, result.size());
List<IBaseResource> resources;
resources = result.getResources(0, 100000);
assertEquals(790, resources.size());
assertEquals("10", resources.get(0).getIdElement().getValueAsString());
assertEquals("799", resources.get(789).getIdElement().getValueAsString());
ArgumentCaptor<Search> searchCaptor = ArgumentCaptor.forClass(Search.class);
verify(mySearchDao, atLeastOnce()).save(searchCaptor.capture());
verify(mySearchResultDao, atLeastOnce()).save(mySearchResultIterCaptor.capture());
List<SearchResult> allResults= new ArrayList<SearchResult>();
for (Iterable<SearchResult> next : mySearchResultIterCaptor.getAllValues()) {
allResults.addAll(Lists.newArrayList(next));
}
assertEquals(790, allResults.size());
assertEquals(10, allResults.get(0).getResourcePid().longValue());
assertEquals(799, allResults.get(789).getResourcePid().longValue());
}
@Test
public void testAsyncSearchLargeResultSetSameCoordinator() {
SearchParameterMap params = new SearchParameterMap();
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 800);
SlowIterator<Long> iter = new SlowIterator<Long>(pids.iterator(), 2);
when(mySearchBuider.createQuery(Mockito.same(params))).thenReturn(iter);
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(any(List.class), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient");
assertNotNull(result.getUuid());
assertEquals(null, result.size());
List<IBaseResource> resources;
resources = result.getResources(0, 30);
assertEquals(30, resources.size());
assertEquals("10", resources.get(0).getIdElement().getValueAsString());
assertEquals("39", resources.get(29).getIdElement().getValueAsString());
}
/**
* Subsequent requests for the same search (i.e. a request for the next
* page) within the same JVM will not use the original bundle provider
*/
@Test
public void testAsyncSearchLargeResultSetSecondRequestSameCoordinator() {
SearchParameterMap params = new SearchParameterMap();
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 800);
Iterator<Long> iter = new SlowIterator<Long>(pids.iterator(), 2);
when(mySearchBuider.createQuery(Mockito.same(params))).thenReturn(iter);
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(any(List.class), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient");
assertNotNull(result.getUuid());
assertEquals(null, result.size());
ArgumentCaptor<Search> searchCaptor = ArgumentCaptor.forClass(Search.class);
verify(mySearchDao, atLeast(1)).save(searchCaptor.capture());
Search search = searchCaptor.getValue();
assertEquals(SearchTypeEnum.SEARCH, search.getSearchType());
List<IBaseResource> resources;
PersistedJpaBundleProvider provider;
resources = result.getResources(0, 10);
assertNull(result.size());
assertEquals(10, resources.size());
assertEquals("10", resources.get(0).getIdElement().getValueAsString());
assertEquals("19", resources.get(9).getIdElement().getValueAsString());
when(mySearchDao.findByUuid(eq(result.getUuid()))).thenReturn(search);
/*
* Now call from a new bundle provider. This simulates a separate HTTP
* client request coming in.
*/
provider = new PersistedJpaBundleProvider(result.getUuid(), myCallingDao);
resources = provider.getResources(10, 20);
assertEquals(10, resources.size());
assertEquals("20", resources.get(0).getIdElement().getValueAsString());
assertEquals("29", resources.get(9).getIdElement().getValueAsString());
provider = new PersistedJpaBundleProvider(result.getUuid(), myCallingDao);
resources = provider.getResources(20, 99999);
assertEquals(770, resources.size());
assertEquals("30", resources.get(0).getIdElement().getValueAsString());
assertEquals("799", resources.get(769).getIdElement().getValueAsString());
myExpectedNumberOfSearchBuildersCreated = 3;
}
@Test
public void testAsyncSearchSmallResultSetSameCoordinator() {
SearchParameterMap params = new SearchParameterMap();
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 100);
SlowIterator<Long> iter = new SlowIterator<Long>(pids.iterator(), 2);
when(mySearchBuider.createQuery(Mockito.same(params))).thenReturn(iter);
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(any(List.class), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient");
assertNotNull(result.getUuid());
assertEquals(90, result.size().intValue());
List<IBaseResource> resources = result.getResources(0, 30);
assertEquals(30, resources.size());
assertEquals("10", resources.get(0).getIdElement().getValueAsString());
assertEquals("39", resources.get(29).getIdElement().getValueAsString());
}
@Test
public void testGetPage() {
Pageable page = SearchCoordinatorSvcImpl.toPage(50, 73);
assertEquals(50, page.getOffset());
}
@Test
public void testLoadSearchResultsFromDifferentCoordinator() {
final String uuid = UUID.randomUUID().toString();
final Search search = new Search();
search.setUuid(uuid);
search.setSearchType(SearchTypeEnum.SEARCH);
search.setResourceType("Patient");
when(mySearchDao.findByUuid(eq(uuid))).thenReturn(search);
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(any(List.class), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
PersistedJpaBundleProvider provider;
List<IBaseResource> resources;
new Thread() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore
}
when(mySearchResultDao.findWithSearchUuid(any(Search.class), any(Pageable.class))).thenAnswer(new Answer<Page<SearchResult>>() {
@Override
public Page<SearchResult> answer(InvocationOnMock theInvocation) throws Throwable {
Pageable page = (Pageable) theInvocation.getArguments()[1];
ArrayList<SearchResult> results = new ArrayList<SearchResult>();
int max = (page.getPageNumber() * page.getPageSize()) + page.getPageSize();
for (int i = page.getOffset(); i < max; i++) {
results.add(new SearchResult().setResourcePid(i + 10L));
}
return new PageImpl<SearchResult>(results);
}});
search.setStatus(SearchStatusEnum.FINISHED);
}
}.start();
/*
* Now call from a new bundle provider. This simulates a separate HTTP
* client request coming in.
*/
provider = new PersistedJpaBundleProvider(uuid, myCallingDao);
resources = provider.getResources(10, 20);
assertEquals(10, resources.size());
assertEquals("20", resources.get(0).getIdElement().getValueAsString());
assertEquals("29", resources.get(9).getIdElement().getValueAsString());
provider = new PersistedJpaBundleProvider(uuid, myCallingDao);
resources = provider.getResources(20, 40);
assertEquals(20, resources.size());
assertEquals("30", resources.get(0).getIdElement().getValueAsString());
assertEquals("49", resources.get(19).getIdElement().getValueAsString());
myExpectedNumberOfSearchBuildersCreated = 3;
}
@Test
public void testSynchronousSearch() {
SearchParameterMap params = new SearchParameterMap();
params.setLoadSynchronous(true);
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 800);
when(mySearchBuider.createQuery(Mockito.same(params))).thenReturn(pids.iterator());
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(eq(pids), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient");
assertNull(result.getUuid());
assertEquals(790, result.size().intValue());
List<IBaseResource> resources = result.getResources(0, 10000);
assertEquals(790, resources.size());
assertEquals("10", resources.get(0).getIdElement().getValueAsString());
assertEquals("799", resources.get(789).getIdElement().getValueAsString());
}
@Test
public void testSynchronousSearchUpTo() {
SearchParameterMap params = new SearchParameterMap();
params.setLoadSynchronousUpTo(100);
params.add("name", new StringParam("ANAME"));
List<Long> pids = createPidSequence(10, 800);
when(mySearchBuider.createQuery(Mockito.same(params))).thenReturn(pids.iterator());
pids = createPidSequence(10, 110);
doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(eq(pids), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao));
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient");
assertNull(result.getUuid());
assertEquals(100, result.size().intValue());
List<IBaseResource> resources = result.getResources(0, 10000);
assertEquals(100, resources.size());
assertEquals("10", resources.get(0).getIdElement().getValueAsString());
assertEquals("109", resources.get(99).getIdElement().getValueAsString());
}
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
public static class FailAfterNIterator<T> implements Iterator<T> {
private int myCount;
private Iterator<T> myWrap;
public FailAfterNIterator(Iterator<T> theWrap, int theCount) {
myWrap = theWrap;
myCount = theCount;
}
@Override
public boolean hasNext() {
return myWrap.hasNext();
}
@Override
public T next() {
myCount--;
if (myCount == 0) {
throw new NullPointerException("FAILED");
}
return myWrap.next();
}
}
public static class SlowIterator<T> implements Iterator<T> {
private int myDelay;
private Iterator<T> myWrap;
public SlowIterator(Iterator<T> theWrap, int theDelay) {
myWrap = theWrap;
myDelay = theDelay;
}
@Override
public boolean hasNext() {
return myWrap.hasNext();
}
@Override
public T next() {
try {
Thread.sleep(myDelay);
} catch (InterruptedException e) {
// ignore
}
return myWrap.next();
}
}
}