3637 encapsulation of synchronous query execution capabilities into a new service for reuse (#3638)

* Adding test reproducing the issue.

* Providing implementation fixing the issue and strengthening integrated test.

* Cleaning up commended code.

* Preparing code for review.

* Adding test class SynchronousSearchSvcImplTest.

* Addressing first code review comments.

* Addressing first code review comments.

* Forcing one extra processing step to ensure exit condition is met.

* Adding tests

* Clean up in preparation of second code review.

* Addition of setting page size if required.

* Grouping synch test in nested test class.

Co-authored-by: peartree <etienne.poirier@smilecdr.com>
This commit is contained in:
Etienne Poirier 2022-06-05 20:38:53 -04:00 committed by GitHub
parent 1a90c7e9ce
commit 868a7e5c40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 811 additions and 346 deletions

View File

@ -89,10 +89,12 @@ import ca.uhn.fhir.jpa.provider.r4.MemberMatcherR4Helper;
import ca.uhn.fhir.jpa.reindex.ResourceReindexSvcImpl; import ca.uhn.fhir.jpa.reindex.ResourceReindexSvcImpl;
import ca.uhn.fhir.jpa.sched.AutowiringSpringBeanJobFactory; import ca.uhn.fhir.jpa.sched.AutowiringSpringBeanJobFactory;
import ca.uhn.fhir.jpa.sched.HapiSchedulerServiceImpl; import ca.uhn.fhir.jpa.sched.HapiSchedulerServiceImpl;
import ca.uhn.fhir.jpa.search.ISynchronousSearchSvc;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider; import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProviderFactory; import ca.uhn.fhir.jpa.search.PersistedJpaBundleProviderFactory;
import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider; import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider;
import ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl; import ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl;
import ca.uhn.fhir.jpa.search.SynchronousSearchSvcImpl;
import ca.uhn.fhir.jpa.search.builder.QueryStack; import ca.uhn.fhir.jpa.search.builder.QueryStack;
import ca.uhn.fhir.jpa.search.builder.SearchBuilder; import ca.uhn.fhir.jpa.search.builder.SearchBuilder;
import ca.uhn.fhir.jpa.search.builder.predicate.ComboNonUniqueSearchParameterPredicateBuilder; import ca.uhn.fhir.jpa.search.builder.predicate.ComboNonUniqueSearchParameterPredicateBuilder;
@ -835,4 +837,9 @@ public class JpaConfig {
public NicknameInterceptor nicknameInterceptor() throws IOException { public NicknameInterceptor nicknameInterceptor() throws IOException {
return new NicknameInterceptor(); return new NicknameInterceptor();
} }
@Bean
public ISynchronousSearchSvc synchronousSearchSvc(){
return new SynchronousSearchSvcImpl();
}
} }

View File

@ -0,0 +1,14 @@
package ca.uhn.fhir.jpa.search;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.svc.ISearchSvc;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
public interface ISynchronousSearchSvc extends ISearchSvc {
IBundleProvider executeQuery(SearchParameterMap theParams, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, Integer theLoadSynchronousUpTo, RequestPartitionId theRequestPartitionId);
}

View File

@ -47,24 +47,20 @@ import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum; import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.api.IQueryParameterType;
import ca.uhn.fhir.model.api.Include; import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.rest.api.CacheControlDirective; import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum;
import ca.uhn.fhir.rest.api.SearchTotalModeEnum; import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
import ca.uhn.fhir.rest.api.SummaryEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails; import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.IPagingProvider; import ca.uhn.fhir.rest.server.IPagingProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.interceptor.ServerInterceptorUtil;
import ca.uhn.fhir.rest.server.method.PageMethodBinding; import ca.uhn.fhir.rest.server.method.PageMethodBinding;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
@ -100,7 +96,6 @@ import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.persistence.EntityManager;
import java.io.IOException; import java.io.IOException;
import java.time.Instant; import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
@ -117,6 +112,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount;
import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount;
import static java.util.Objects.nonNull;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -125,15 +123,14 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
public static final int DEFAULT_SYNC_SIZE = 250; public static final int DEFAULT_SYNC_SIZE = 250;
public static final String UNIT_TEST_CAPTURE_STACK = "unit_test_capture_stack"; public static final String UNIT_TEST_CAPTURE_STACK = "unit_test_capture_stack";
public static final Integer INTEGER_0 = 0;
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>();
@Autowired @Autowired
private FhirContext myContext; private FhirContext myContext;
@Autowired @Autowired
private DaoConfig myDaoConfig; private DaoConfig myDaoConfig;
@Autowired
private EntityManager myEntityManager;
private Integer myLoadingThrottleForUnitTests = null; private Integer myLoadingThrottleForUnitTests = null;
private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
private boolean myNeverUseLocalSearchForUnitTests; private boolean myNeverUseLocalSearchForUnitTests;
@ -152,6 +149,9 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
@Autowired @Autowired
private SearchBuilderFactory mySearchBuilderFactory; private SearchBuilderFactory mySearchBuilderFactory;
@Autowired
private ISynchronousSearchSvc mySynchronousSearchSvc;
private int mySyncSize = DEFAULT_SYNC_SIZE; private int mySyncSize = DEFAULT_SYNC_SIZE;
/** /**
* Set in {@link #start()} * Set in {@link #start()}
@ -340,7 +340,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) { if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) {
ourLog.debug("Search {} is loading in synchronous mode", searchUuid); ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
return executeQuery(theResourceType, theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId); return mySynchronousSearchSvc.executeQuery(theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId);
} }
/* /*
@ -508,152 +508,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
return candidate.orElse(null); return candidate.orElse(null);
} }
private IBundleProvider executeQuery(String theResourceType, SearchParameterMap theParams, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, Integer theLoadSynchronousUpTo, RequestPartitionId theRequestPartitionId) {
SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequestDetails, theSearchUuid);
searchRuntimeDetails.setLoadSynchronous(true);
boolean wantOnlyCount = isWantOnlyCount(theParams);
boolean wantCount = isWantCount(theParams, wantOnlyCount);
// Execute the query and make sure we return distinct results
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
txTemplate.setReadOnly(theParams.isLoadSynchronous() || theParams.isOffsetQuery());
return txTemplate.execute(t -> {
// Load the results synchronously
final List<ResourcePersistentId> pids = new ArrayList<>();
Long count = 0L;
if (wantCount) {
ourLog.trace("Performing count");
// TODO FulltextSearchSvcImpl will remove necessary parameters from the "theParams", this will cause actual query after count to
// return wrong response. This is some dirty fix to avoid that issue. Params should not be mutated?
// Maybe instead of removing them we could skip them in db query builder if full text search was used?
List<List<IQueryParameterType>> contentAndTerms = theParams.get(Constants.PARAM_CONTENT);
List<List<IQueryParameterType>> textAndTerms = theParams.get(Constants.PARAM_TEXT);
count = theSb.createCountQuery(theParams, theSearchUuid, theRequestDetails, theRequestPartitionId);
if (contentAndTerms != null) theParams.put(Constants.PARAM_CONTENT, contentAndTerms);
if (textAndTerms != null) theParams.put(Constants.PARAM_TEXT, textAndTerms);
ourLog.trace("Got count {}", count);
}
if (wantOnlyCount) {
SimpleBundleProvider bundleProvider = new SimpleBundleProvider();
bundleProvider.setSize(count.intValue());
return bundleProvider;
}
try (IResultIterator resultIter = theSb.createQuery(theParams, searchRuntimeDetails, theRequestDetails, theRequestPartitionId)) {
while (resultIter.hasNext()) {
pids.add(resultIter.next());
if (theLoadSynchronousUpTo != null && pids.size() >= theLoadSynchronousUpTo) {
break;
}
if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
break;
}
}
} catch (IOException e) {
ourLog.error("IO failure during database access", e);
throw new InternalErrorException(Msg.code(1164) + e);
}
JpaPreResourceAccessDetails accessDetails = new JpaPreResourceAccessDetails(pids, () -> theSb);
HookParams params = new HookParams()
.add(IPreResourceAccessDetails.class, accessDetails)
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PREACCESS_RESOURCES, params);
for (int i = pids.size() - 1; i >= 0; i--) {
if (accessDetails.isDontReturnResourceAtIndex(i)) {
pids.remove(i);
}
}
/*
* For synchronous queries, we load all the includes right away
* since we're returning a static bundle with all the results
* pre-loaded. This is ok because synchronous requests are not
* expected to be paged
*
* On the other hand for async queries we load includes/revincludes
* individually for pages as we return them to clients
*/
// _includes
Integer maxIncludes = myDaoConfig.getMaximumIncludesToLoadPerPage();
final Set<ResourcePersistentId> includedPids = theSb.loadIncludes(myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated(), "(synchronous)", theRequestDetails, maxIncludes);
if (maxIncludes != null) {
maxIncludes -= includedPids.size();
}
pids.addAll(includedPids);
List<ResourcePersistentId> includedPidsList = new ArrayList<>(includedPids);
// _revincludes
if (theParams.getEverythingMode() == null && (maxIncludes == null || maxIncludes > 0)) {
Set<ResourcePersistentId> revIncludedPids = theSb.loadIncludes(myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated(), "(synchronous)", theRequestDetails, maxIncludes);
includedPids.addAll(revIncludedPids);
pids.addAll(revIncludedPids);
includedPidsList.addAll(revIncludedPids);
}
List<IBaseResource> resources = new ArrayList<>();
theSb.loadResourcesByPid(pids, includedPidsList, resources, false, theRequestDetails);
// Hook: STORAGE_PRESHOW_RESOURCES
resources = ServerInterceptorUtil.fireStoragePreshowResource(resources, theRequestDetails, myInterceptorBroadcaster);
SimpleBundleProvider bundleProvider = new SimpleBundleProvider(resources);
if (theParams.isOffsetQuery()) {
bundleProvider.setCurrentPageOffset(theParams.getOffset());
bundleProvider.setCurrentPageSize(theParams.getCount());
ourLog.warn("Query from search {} is using _offset, may result in duplicate entries across different pages.", theSearchUuid);
}
if (wantCount) {
bundleProvider.setSize(count.intValue());
} else {
Integer queryCount = getQueryCount(theLoadSynchronousUpTo, theParams);
if (queryCount == null || queryCount > resources.size()) {
// No limit, last page or everything was fetched within the limit
bundleProvider.setSize(getTotalCount(queryCount, theParams.getOffset(), resources.size()));
} else {
bundleProvider.setSize(null);
}
}
bundleProvider.setPreferredPageSize(theParams.getCount());
return bundleProvider;
});
}
private int getTotalCount(Integer queryCount, Integer offset, int queryResultCount) {
if (queryCount != null) {
if (offset != null) {
return offset + queryResultCount;
} else {
return queryResultCount;
}
} else {
return queryResultCount;
}
}
private Integer getQueryCount(Integer theLoadSynchronousUpTo, SearchParameterMap theParams) {
if (theLoadSynchronousUpTo != null) {
return theLoadSynchronousUpTo;
} else if (theParams.getCount() != null) {
return theParams.getCount();
} else if (myDaoConfig.getFetchSizeDefaultMaximum() != null) {
return myDaoConfig.getFetchSizeDefaultMaximum();
}
return null;
}
@Nullable @Nullable
private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) { private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) {
final Integer loadSynchronousUpTo; final Integer loadSynchronousUpTo;
@ -682,11 +536,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
myDaoConfig = theDaoConfig; myDaoConfig = theDaoConfig;
} }
@VisibleForTesting
void setEntityManagerForUnitTest(EntityManager theEntityManager) {
myEntityManager = theEntityManager;
}
@VisibleForTesting @VisibleForTesting
public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
@ -732,15 +581,9 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
myRequestPartitionHelperService = theRequestPartitionHelperService; myRequestPartitionHelperService = theRequestPartitionHelperService;
} }
private boolean isWantCount(SearchParameterMap myParams, boolean wantOnlyCount) { @VisibleForTesting
return wantOnlyCount || public void setSynchronousSearchSvc(ISynchronousSearchSvc theSynchronousSearchSvc) {
SearchTotalModeEnum.ACCURATE.equals(myParams.getSearchTotalMode()) || mySynchronousSearchSvc = theSynchronousSearchSvc;
(myParams.getSearchTotalMode() == null && SearchTotalModeEnum.ACCURATE.equals(myDaoConfig.getDefaultTotalMode()));
}
private static boolean isWantOnlyCount(SearchParameterMap myParams) {
return SummaryEnum.COUNT.equals(myParams.getSummaryMode())
| INTEGER_0.equals(myParams.getCount());
} }
public static void populateSearchEntity(SearchParameterMap theParams, String theResourceType, String theSearchUuid, String theQueryString, Search theSearch, RequestPartitionId theRequestPartitionId) { public static void populateSearchEntity(SearchParameterMap theParams, String theResourceType, String theSearchUuid, String theQueryString, Search theSearch, RequestPartitionId theRequestPartitionId) {
@ -827,7 +670,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} }
} }
/** /**
* A search task is a Callable task that runs in * A search task is a Callable task that runs in
* a thread pool to handle an individual search. One instance * a thread pool to handle an individual search. One instance
* is created for any requested search and runs from the * is created for any requested search and runs from the
@ -1219,9 +1062,10 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
* *
* before doing anything else. * before doing anything else.
*/ */
boolean wantOnlyCount = isWantOnlyCount(myParams); boolean myParamWantOnlyCount = isWantOnlyCount(myParams);
boolean wantCount = isWantCount(myParams, wantOnlyCount); boolean myParamOrDefaultWantCount = nonNull(myParams.getSearchTotalMode()) ? isWantCount(myParams) : isWantCount(myDaoConfig.getDefaultTotalMode());
if (wantCount) {
if (myParamWantOnlyCount || myParamOrDefaultWantCount) {
ourLog.trace("Performing count"); ourLog.trace("Performing count");
ISearchBuilder sb = newSearchBuilder(); ISearchBuilder sb = newSearchBuilder();
@ -1243,13 +1087,13 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
@Override @Override
protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theArg0) { protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theArg0) {
mySearch.setTotalCount(count.intValue()); mySearch.setTotalCount(count.intValue());
if (wantOnlyCount) { if (myParamWantOnlyCount) {
mySearch.setStatus(SearchStatusEnum.FINISHED); mySearch.setStatus(SearchStatusEnum.FINISHED);
} }
doSaveSearch(); doSaveSearch();
} }
}); });
if (wantOnlyCount) { if (myParamWantOnlyCount) {
return; return;
} }
} }

View File

@ -0,0 +1,236 @@
package ca.uhn.fhir.jpa.search;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.api.IQueryParameterType;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.interceptor.ServerInterceptorUtil;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
import javax.persistence.EntityManager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount;
import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount;
import static java.util.Objects.nonNull;
public class SynchronousSearchSvcImpl implements ISynchronousSearchSvc {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SynchronousSearchSvcImpl.class);
private FhirContext myContext;
@Autowired
private DaoConfig myDaoConfig;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private PlatformTransactionManager myManagedTxManager;
@Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster;
@Autowired
private EntityManager myEntityManager;
private int mySyncSize = 250;
public IBundleProvider executeQuery(SearchParameterMap theParams, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, Integer theLoadSynchronousUpTo, RequestPartitionId theRequestPartitionId) {
SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequestDetails, theSearchUuid);
searchRuntimeDetails.setLoadSynchronous(true);
boolean theParamWantOnlyCount = isWantOnlyCount(theParams);
boolean theParamOrConfigWantCount = nonNull(theParams.getSearchTotalMode()) ? isWantCount(theParams) : isWantCount(myDaoConfig.getDefaultTotalMode());
boolean wantCount = theParamWantOnlyCount || theParamOrConfigWantCount;
// Execute the query and make sure we return distinct results
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
txTemplate.setReadOnly(theParams.isLoadSynchronous() || theParams.isOffsetQuery());
return txTemplate.execute(t -> {
// Load the results synchronously
final List<ResourcePersistentId> pids = new ArrayList<>();
Long count = 0L;
if (wantCount) {
ourLog.trace("Performing count");
// TODO FulltextSearchSvcImpl will remove necessary parameters from the "theParams", this will cause actual query after count to
// return wrong response. This is some dirty fix to avoid that issue. Params should not be mutated?
// Maybe instead of removing them we could skip them in db query builder if full text search was used?
List<List<IQueryParameterType>> contentAndTerms = theParams.get(Constants.PARAM_CONTENT);
List<List<IQueryParameterType>> textAndTerms = theParams.get(Constants.PARAM_TEXT);
count = theSb.createCountQuery(theParams, theSearchUuid, theRequestDetails, theRequestPartitionId);
if (contentAndTerms != null) theParams.put(Constants.PARAM_CONTENT, contentAndTerms);
if (textAndTerms != null) theParams.put(Constants.PARAM_TEXT, textAndTerms);
ourLog.trace("Got count {}", count);
}
if (theParamWantOnlyCount) {
SimpleBundleProvider bundleProvider = new SimpleBundleProvider();
bundleProvider.setSize(count.intValue());
return bundleProvider;
}
try (IResultIterator resultIter = theSb.createQuery(theParams, searchRuntimeDetails, theRequestDetails, theRequestPartitionId)) {
while (resultIter.hasNext()) {
pids.add(resultIter.next());
if (theLoadSynchronousUpTo != null && pids.size() >= theLoadSynchronousUpTo) {
break;
}
if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
break;
}
}
} catch (IOException e) {
ourLog.error("IO failure during database access", e);
throw new InternalErrorException(Msg.code(1164) + e);
}
JpaPreResourceAccessDetails accessDetails = new JpaPreResourceAccessDetails(pids, () -> theSb);
HookParams params = new HookParams()
.add(IPreResourceAccessDetails.class, accessDetails)
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PREACCESS_RESOURCES, params);
for (int i = pids.size() - 1; i >= 0; i--) {
if (accessDetails.isDontReturnResourceAtIndex(i)) {
pids.remove(i);
}
}
/*
* For synchronous queries, we load all the includes right away
* since we're returning a static bundle with all the results
* pre-loaded. This is ok because synchronous requests are not
* expected to be paged
*
* On the other hand for async queries we load includes/revincludes
* individually for pages as we return them to clients
*/
// _includes
Integer maxIncludes = myDaoConfig.getMaximumIncludesToLoadPerPage();
final Set<ResourcePersistentId> includedPids = theSb.loadIncludes(myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated(), "(synchronous)", theRequestDetails, maxIncludes);
if (maxIncludes != null) {
maxIncludes -= includedPids.size();
}
pids.addAll(includedPids);
List<ResourcePersistentId> includedPidsList = new ArrayList<>(includedPids);
// _revincludes
if (theParams.getEverythingMode() == null && (maxIncludes == null || maxIncludes > 0)) {
Set<ResourcePersistentId> revIncludedPids = theSb.loadIncludes(myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated(), "(synchronous)", theRequestDetails, maxIncludes);
includedPids.addAll(revIncludedPids);
pids.addAll(revIncludedPids);
includedPidsList.addAll(revIncludedPids);
}
List<IBaseResource> resources = new ArrayList<>();
theSb.loadResourcesByPid(pids, includedPidsList, resources, false, theRequestDetails);
// Hook: STORAGE_PRESHOW_RESOURCES
resources = ServerInterceptorUtil.fireStoragePreshowResource(resources, theRequestDetails, myInterceptorBroadcaster);
SimpleBundleProvider bundleProvider = new SimpleBundleProvider(resources);
if (theParams.isOffsetQuery()) {
bundleProvider.setCurrentPageOffset(theParams.getOffset());
bundleProvider.setCurrentPageSize(theParams.getCount());
}
if (wantCount) {
bundleProvider.setSize(count.intValue());
} else {
Integer queryCount = getQueryCount(theLoadSynchronousUpTo, theParams);
if (queryCount == null || queryCount > resources.size()) {
// No limit, last page or everything was fetched within the limit
bundleProvider.setSize(getTotalCount(queryCount, theParams.getOffset(), resources.size()));
} else {
bundleProvider.setSize(null);
}
}
bundleProvider.setPreferredPageSize(theParams.getCount());
return bundleProvider;
});
}
@Override
public IBundleProvider executeQuery(String theResourceType, SearchParameterMap theSearchParameterMap, RequestPartitionId theRequestPartitionId) {
final String searchUuid = UUID.randomUUID().toString();
IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(theResourceType);
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(theResourceType).getImplementingClass();
final ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(callingDao, theResourceType, resourceTypeClass);
sb.setFetchSize(mySyncSize);
return executeQuery(theSearchParameterMap, null, searchUuid, sb, theSearchParameterMap.getLoadSynchronousUpTo(), theRequestPartitionId);
}
@Autowired
public void setContext(FhirContext theContext) {
myContext = theContext;
}
private int getTotalCount(Integer queryCount, Integer offset, int queryResultCount) {
if (queryCount != null) {
if (offset != null) {
return offset + queryResultCount;
} else {
return queryResultCount;
}
} else {
return queryResultCount;
}
}
private Integer getQueryCount(Integer theLoadSynchronousUpTo, SearchParameterMap theParams) {
if (theLoadSynchronousUpTo != null) {
return theLoadSynchronousUpTo;
} else if (theParams.getCount() != null) {
return theParams.getCount();
} else if (myDaoConfig.getFetchSizeDefaultMaximum() != null) {
return myDaoConfig.getFetchSizeDefaultMaximum();
}
return null;
}
}

View File

@ -75,7 +75,6 @@ import ca.uhn.fhir.model.valueset.BundleEntrySearchModeEnum;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum;
import ca.uhn.fhir.rest.api.SearchContainedModeEnum; import ca.uhn.fhir.rest.api.SearchContainedModeEnum;
import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
import ca.uhn.fhir.rest.api.SortOrderEnum; import ca.uhn.fhir.rest.api.SortOrderEnum;
import ca.uhn.fhir.rest.api.SortSpec; import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails; import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails;
@ -93,12 +92,10 @@ import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.StringUtil; import ca.uhn.fhir.util.StringUtil;
import ca.uhn.fhir.util.UrlUtil; import ca.uhn.fhir.util.UrlUtil;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams; import com.google.common.collect.Streams;
import com.healthmarketscience.sqlbuilder.Condition; import com.healthmarketscience.sqlbuilder.Condition;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.math.NumberUtils;
import org.apache.jena.sparql.engine.QueryIterator;
import org.hl7.fhir.instance.model.api.IAnyResource; import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -121,7 +118,6 @@ import javax.persistence.criteria.From;
import javax.persistence.criteria.Predicate; import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root; import javax.persistence.criteria.Root;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;

View File

@ -0,0 +1,24 @@
package ca.uhn.fhir.jpa.util;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
import ca.uhn.fhir.rest.api.SummaryEnum;
import static ca.uhn.fhir.jpa.searchparam.SearchParameterMap.INTEGER_0;
public class SearchParameterMapCalculator {
static public boolean isWantCount(SearchParameterMap myParams) {
return isWantCount(myParams.getSearchTotalMode());
}
static public boolean isWantCount(SearchTotalModeEnum theSearchTotalModeEnum){
return SearchTotalModeEnum.ACCURATE.equals(theSearchTotalModeEnum);
}
static public boolean isWantOnlyCount(SearchParameterMap myParams) {
return SummaryEnum.COUNT.equals(myParams.getSummaryMode())
| INTEGER_0.equals(myParams.getCount());
}
}

View File

@ -28,6 +28,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.api.svc.ISearchSvc;
import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
@ -78,6 +79,10 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static ca.uhn.fhir.rest.server.provider.ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID; import static ca.uhn.fhir.rest.server.provider.ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static org.apache.commons.collections4.CollectionUtils.isNotEmpty;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -102,6 +107,9 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
@Autowired @Autowired
private ISchedulerService mySchedulerService; private ISchedulerService mySchedulerService;
@Autowired
private ISearchSvc mySearchService;
@Override @Override
public IBaseParameters triggerSubscription(List<IPrimitiveType<String>> theResourceIds, List<IPrimitiveType<String>> theSearchUrls, @IdParam IIdType theSubscriptionId) { public IBaseParameters triggerSubscription(List<IPrimitiveType<String>> theResourceIds, List<IPrimitiveType<String>> theSearchUrls, @IdParam IIdType theSubscriptionId) {
@ -119,8 +127,8 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
subscriptionDao.read(subscriptionId, SystemRequestDetails.forAllPartitions()); subscriptionDao.read(subscriptionId, SystemRequestDetails.forAllPartitions());
} }
List<IPrimitiveType<String>> resourceIds = ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList()); List<IPrimitiveType<String>> resourceIds = defaultIfNull(theResourceIds, Collections.emptyList());
List<IPrimitiveType<String>> searchUrls = ObjectUtils.defaultIfNull(theSearchUrls, Collections.emptyList()); List<IPrimitiveType<String>> searchUrls = defaultIfNull(theSearchUrls, Collections.emptyList());
// Make sure we have at least one resource ID or search URL // Make sure we have at least one resource ID or search URL
if (resourceIds.size() == 0 && searchUrls.size() == 0) { if (resourceIds.size() == 0 && searchUrls.size() == 0) {
@ -182,7 +190,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
// If the job is complete, remove it from the queue // If the job is complete, remove it from the queue
if (activeJob.getRemainingResourceIds().isEmpty()) { if (activeJob.getRemainingResourceIds().isEmpty()) {
if (activeJob.getRemainingSearchUrls().isEmpty()) { if (activeJob.getRemainingSearchUrls().isEmpty()) {
if (isBlank(activeJob.myCurrentSearchUuid)) { if (jobHasCompleted(activeJob)) {
myActiveJobs.remove(0); myActiveJobs.remove(0);
String remainingJobsMsg = ""; String remainingJobsMsg = "";
if (myActiveJobs.size() > 0) { if (myActiveJobs.size() > 0) {
@ -216,26 +224,106 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
return; return;
} }
// If we don't have an active search started, and one needs to be.. start it IBundleProvider search = null;
if (isBlank(theJobDetails.getCurrentSearchUuid()) && theJobDetails.getRemainingSearchUrls().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
// This is the job initial step where we set ourselves up to do the actual re-submitting of resources
// to the broker. Note that querying of resource can be done synchronously or asynchronously
if ( isInitialStep(theJobDetails) && isNotEmpty(theJobDetails.getRemainingSearchUrls()) && totalSubmitted < myMaxSubmitPerPass){
String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0); String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0);
RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, nextSearchUrl); RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, nextSearchUrl);
String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?')); String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?'));
String resourceType = resourceDef.getName();
IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType);
SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef); SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
String resourceType = resourceDef.getName();
IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType);
ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl); ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl);
IBundleProvider search = mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective(), null, RequestPartitionId.allPartitions()); search = mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective(), null, RequestPartitionId.allPartitions());
theJobDetails.setCurrentSearchUuid(search.getUuid());
if (isNull(search.getUuid())) {
// we don't have a search uuid i.e. we're setting up for synchronous processing
theJobDetails.setCurrentSearchUrl(nextSearchUrl);
theJobDetails.setCurrentOffset(params.getOffset());
} else {
// populate properties for asynchronous path
theJobDetails.setCurrentSearchUuid(search.getUuid());
}
theJobDetails.setCurrentSearchResourceType(resourceType); theJobDetails.setCurrentSearchResourceType(resourceType);
theJobDetails.setCurrentSearchCount(params.getCount()); theJobDetails.setCurrentSearchCount(params.getCount());
theJobDetails.setCurrentSearchLastUploadedIndex(-1); theJobDetails.setCurrentSearchLastUploadedIndex(-1);
} }
// If we have an active search going, submit resources from it // processing step for synchronous processing mode
if (isNotBlank(theJobDetails.getCurrentSearchUrl()) && totalSubmitted < myMaxSubmitPerPass) {
List<IBaseResource> allCurrentResources;
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
String searchUrl = theJobDetails.getCurrentSearchUrl();
ourLog.info("Triggered job [{}] - Starting synchronous processing at offset {} and index {}", theJobDetails.getJobId(), theJobDetails.getCurrentOffset(), fromIndex );
int submittableCount = myMaxSubmitPerPass - totalSubmitted;
int toIndex = fromIndex + submittableCount;
if (nonNull(search) && !search.isEmpty()) {
// we already have data from the initial step so process as much as we can.
ourLog.info("Triggered job[{}] will process up to {} resources", theJobDetails.getJobId(), toIndex);
allCurrentResources = search.getResources(0, toIndex);
} else {
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
}
RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl);
String queryPart = searchUrl.substring(searchUrl.indexOf('?'));
SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
int offset = theJobDetails.getCurrentOffset() + fromIndex;
params.setOffset(offset);
params.setCount(toIndex);
ourLog.info("Triggered job[{}] requesting {} resources from offset {}", theJobDetails.getJobId(), toIndex, offset);
search = mySearchService.executeQuery(resourceDef.getName(), params, RequestPartitionId.allPartitions());
allCurrentResources = search.getAllResources();
}
ourLog.info("Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size());
int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex();
for (IBaseResource nextResource : allCurrentResources) {
Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource);
futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future));
totalSubmitted++;
highestIndexSubmitted++;
}
if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
return;
}
theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted);
ourLog.info("Triggered job[{}] lastUploadedIndex is {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchLastUploadedIndex());
if (allCurrentResources.isEmpty() || nonNull(theJobDetails.getCurrentSearchCount()) && toIndex >= theJobDetails.getCurrentSearchCount()) {
ourLog.info("Triggered job[{}] for search URL {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUrl());
theJobDetails.setCurrentSearchResourceType(null);
theJobDetails.clearCurrentSearchUrl();
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
theJobDetails.setCurrentSearchCount(null);
}
}
// processing step for asynchronous processing mode
if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) { if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) {
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
@ -278,6 +366,14 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS)); ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
} }
private boolean isInitialStep(SubscriptionTriggeringJobDetails theJobDetails) {
return isBlank(theJobDetails.myCurrentSearchUuid) && isBlank(theJobDetails.myCurrentSearchUrl);
}
private boolean jobHasCompleted(SubscriptionTriggeringJobDetails theJobDetails){
return isInitialStep(theJobDetails);
}
private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> theIdToFutures) { private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> theIdToFutures) {
for (Pair<String, Future<Void>> next : theIdToFutures) { for (Pair<String, Future<Void>> next : theIdToFutures) {
@ -419,9 +515,11 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
private List<String> myRemainingResourceIds; private List<String> myRemainingResourceIds;
private List<String> myRemainingSearchUrls; private List<String> myRemainingSearchUrls;
private String myCurrentSearchUuid; private String myCurrentSearchUuid;
private String myCurrentSearchUrl;
private Integer myCurrentSearchCount; private Integer myCurrentSearchCount;
private String myCurrentSearchResourceType; private String myCurrentSearchResourceType;
private int myCurrentSearchLastUploadedIndex; private int myCurrentSearchLastUploadedIndex;
private int myCurrentOffset;
Integer getCurrentSearchCount() { Integer getCurrentSearchCount() {
return myCurrentSearchCount; return myCurrentSearchCount;
@ -479,6 +577,14 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
myCurrentSearchUuid = theCurrentSearchUuid; myCurrentSearchUuid = theCurrentSearchUuid;
} }
public String getCurrentSearchUrl() {
return myCurrentSearchUrl;
}
public void setCurrentSearchUrl(String theCurrentSearchUrl) {
this.myCurrentSearchUrl = theCurrentSearchUrl;
}
int getCurrentSearchLastUploadedIndex() { int getCurrentSearchLastUploadedIndex() {
return myCurrentSearchLastUploadedIndex; return myCurrentSearchLastUploadedIndex;
} }
@ -486,6 +592,18 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) { void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex; myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
} }
public void clearCurrentSearchUrl(){
myCurrentSearchUrl = null;
}
public int getCurrentOffset(){
return myCurrentOffset;
}
public void setCurrentOffset(Integer theCurrentOffset) {
myCurrentOffset = ObjectUtils.defaultIfNull(theCurrentOffset, 0);
}
} }
} }

View File

@ -0,0 +1,117 @@
package ca.uhn.fhir.jpa.search;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.LegacySearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.util.BaseIterator;
import ca.uhn.fhir.model.dstu2.resource.Patient;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.mockito.Mock;
import org.mockito.stubbing.Answer;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.verify;
public class BaseSearchSvc {
protected int myExpectedNumberOfSearchBuildersCreated = 2;
@Mock
protected SearchBuilderFactory mySearchBuilderFactory;
@Mock
protected PlatformTransactionManager myTxManager;
@Mock
protected LegacySearchBuilder mySearchBuilder;
@Mock
protected IFhirResourceDao<?> myCallingDao;
@Mock
protected DaoRegistry myDaoRegistry;
@Mock
protected DaoConfig myDaoConfig;
protected static final FhirContext ourCtx = FhirContext.forDstu3Cached();
public void after() {
verify(mySearchBuilderFactory, atMost(myExpectedNumberOfSearchBuildersCreated)).newSearchBuilder(any(), any(), any());
}
protected List<ResourcePersistentId> createPidSequence(int to) {
List<ResourcePersistentId> pids = new ArrayList<>();
for (long i = 10; i < to; i++) {
pids.add(new ResourcePersistentId(i));
}
return pids;
}
protected Answer<Void> loadPids() {
return theInvocation -> {
List<ResourcePersistentId> pids = (List<ResourcePersistentId>) theInvocation.getArguments()[0];
List<IBaseResource> resources = (List<IBaseResource>) theInvocation.getArguments()[2];
for (ResourcePersistentId nextPid : pids) {
Patient pt = new Patient();
pt.setId(nextPid.toString());
resources.add(pt);
}
return null;
};
}
public static class ResultIterator extends BaseIterator<ResourcePersistentId> implements IResultIterator {
private final Iterator<ResourcePersistentId> myWrap;
private int myCount;
ResultIterator(Iterator<ResourcePersistentId> theWrap) {
myWrap = theWrap;
}
@Override
public boolean hasNext() {
return myWrap.hasNext();
}
@Override
public ResourcePersistentId next() {
myCount++;
return myWrap.next();
}
@Override
public int getSkippedCount() {
return 0;
}
@Override
public int getNonSkippedCount() {
return myCount;
}
@Override
public Collection<ResourcePersistentId> getNextResultBatch(long theBatchSize) {
Collection<ResourcePersistentId> batch = new ArrayList<>();
while (this.hasNext() && batch.size() < theBatchSize) {
batch.add(this.next());
}
return batch;
}
@Override
public void close() {
// nothing
}
}
}

View File

@ -1,14 +1,10 @@
package ca.uhn.fhir.jpa.search; package ca.uhn.fhir.jpa.search;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.LegacySearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.entity.Search; import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchTypeEnum; import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
@ -18,16 +14,13 @@ import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.BaseIterator; import ca.uhn.fhir.jpa.util.BaseIterator;
import ca.uhn.fhir.model.dstu2.resource.Patient;
import ca.uhn.fhir.rest.api.CacheControlDirective; import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.StringParam; import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import com.google.common.collect.Lists;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -37,15 +30,12 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Pageable;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.TransactionStatus;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
@ -63,7 +53,6 @@ import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyBoolean;
@ -75,7 +64,6 @@ import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.ArgumentMatchers.same; import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -83,28 +71,16 @@ import static org.mockito.Mockito.when;
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class SearchCoordinatorSvcImplTest { public class SearchCoordinatorSvcImplTest extends BaseSearchSvc{
private static final Logger ourLog = LoggerFactory.getLogger(SearchCoordinatorSvcImplTest.class); private static final Logger ourLog = LoggerFactory.getLogger(SearchCoordinatorSvcImplTest.class);
private static final FhirContext ourCtx = FhirContext.forDstu3Cached();
@Mock
private IFhirResourceDao<?> myCallingDao;
@Mock
private EntityManager myEntityManager;
private int myExpectedNumberOfSearchBuildersCreated = 2;
@Mock
private LegacySearchBuilder mySearchBuilder;
@Mock @Mock
private ISearchCacheSvc mySearchCacheSvc; private ISearchCacheSvc mySearchCacheSvc;
@Mock @Mock
private ISearchResultCacheSvc mySearchResultCacheSvc; private ISearchResultCacheSvc mySearchResultCacheSvc;
private SearchCoordinatorSvcImpl mySvc; private SearchCoordinatorSvcImpl mySvc;
@Mock
private PlatformTransactionManager myTxManager;
private Search myCurrentSearch; private Search myCurrentSearch;
@Mock @Mock
private DaoRegistry myDaoRegistry;
@Mock
private IInterceptorBroadcaster myInterceptorBroadcaster; private IInterceptorBroadcaster myInterceptorBroadcaster;
@Mock @Mock
private SearchBuilderFactory mySearchBuilderFactory; private SearchBuilderFactory mySearchBuilderFactory;
@ -112,12 +88,13 @@ public class SearchCoordinatorSvcImplTest {
private PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory; private PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory;
@Mock @Mock
private IRequestPartitionHelperSvc myPartitionHelperSvc; private IRequestPartitionHelperSvc myPartitionHelperSvc;
@Mock
private ISynchronousSearchSvc mySynchronousSearchSvc;
@AfterEach @AfterEach
public void after() { public void after() {
System.clearProperty(SearchCoordinatorSvcImpl.UNIT_TEST_CAPTURE_STACK); System.clearProperty(SearchCoordinatorSvcImpl.UNIT_TEST_CAPTURE_STACK);
super.after();
verify(mySearchBuilderFactory, atMost(myExpectedNumberOfSearchBuildersCreated)).newSearchBuilder(any(), any(), any());
} }
@BeforeEach @BeforeEach
@ -127,7 +104,6 @@ public class SearchCoordinatorSvcImplTest {
myCurrentSearch = null; myCurrentSearch = null;
mySvc = new SearchCoordinatorSvcImpl(); mySvc = new SearchCoordinatorSvcImpl();
mySvc.setEntityManagerForUnitTest(myEntityManager);
mySvc.setTransactionManagerForUnitTest(myTxManager); mySvc.setTransactionManagerForUnitTest(myTxManager);
mySvc.setContextForUnitTest(ourCtx); mySvc.setContextForUnitTest(ourCtx);
mySvc.setSearchCacheServicesForUnitTest(mySearchCacheSvc, mySearchResultCacheSvc); mySvc.setSearchCacheServicesForUnitTest(mySearchCacheSvc, mySearchResultCacheSvc);
@ -136,33 +112,13 @@ public class SearchCoordinatorSvcImplTest {
mySvc.setSearchBuilderFactoryForUnitTest(mySearchBuilderFactory); mySvc.setSearchBuilderFactoryForUnitTest(mySearchBuilderFactory);
mySvc.setPersistedJpaBundleProviderFactoryForUnitTest(myPersistedJpaBundleProviderFactory); mySvc.setPersistedJpaBundleProviderFactoryForUnitTest(myPersistedJpaBundleProviderFactory);
mySvc.setRequestPartitionHelperService(myPartitionHelperSvc); mySvc.setRequestPartitionHelperService(myPartitionHelperSvc);
mySvc.setSynchronousSearchSvc(mySynchronousSearchSvc);
DaoConfig daoConfig = new DaoConfig(); DaoConfig daoConfig = new DaoConfig();
mySvc.setDaoConfigForUnitTest(daoConfig); mySvc.setDaoConfigForUnitTest(daoConfig);
} }
private List<ResourcePersistentId> createPidSequence(int to) {
List<ResourcePersistentId> pids = new ArrayList<>();
for (long i = 10; i < to; i++) {
pids.add(new ResourcePersistentId(i));
}
return pids;
}
private Answer<Void> loadPids() {
return theInvocation -> {
List<ResourcePersistentId> pids = (List<ResourcePersistentId>) theInvocation.getArguments()[0];
List<IBaseResource> resources = (List<IBaseResource>) theInvocation.getArguments()[2];
for (ResourcePersistentId nextPid : pids) {
Patient pt = new Patient();
pt.setId(nextPid.toString());
resources.add(pt);
}
return null;
};
}
@Test @Test
public void testAsyncSearchFailDuringSearchSameCoordinator() { public void testAsyncSearchFailDuringSearchSameCoordinator() {
initSearches(); initSearches();
@ -528,76 +484,41 @@ public class SearchCoordinatorSvcImplTest {
@Test @Test
public void testSynchronousSearch() { public void testSynchronousSearch() {
when(mySearchBuilderFactory.newSearchBuilder(any(), any(), any())).thenReturn(mySearchBuilder); when(mySearchBuilderFactory.newSearchBuilder(any(), any(), any())).thenReturn(mySearchBuilder);
when(myTxManager.getTransaction(any())).thenReturn(mock(TransactionStatus.class));
SearchParameterMap params = new SearchParameterMap(); SearchParameterMap params = new SearchParameterMap();
params.setLoadSynchronous(true); params.setLoadSynchronous(true);
params.add("name", new StringParam("ANAME"));
List<ResourcePersistentId> pids = createPidSequence(800); mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
when(mySearchBuilder.createQuery(same(params), any(), any(), nullable(RequestPartitionId.class))).thenReturn(new ResultIterator(pids.iterator()));
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(any(Collection.class), any(Collection.class), any(List.class), anyBoolean(), any()); verify(mySynchronousSearchSvc).executeQuery(any(), any(), any(), any(), any(), any());
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNull(result.getUuid());
assertEquals(790, Objects.requireNonNull(result.size()).intValue());
List<IBaseResource> resources = result.getResources(0, 10000);
assertEquals(790, resources.size());
assertEquals("10", resources.get(0).getIdElement().getValueAsString());
assertEquals("799", resources.get(789).getIdElement().getValueAsString());
} }
@Test @Test
public void testSynchronousSearchWithOffset() { public void testSynchronousSearchWithOffset() {
when(mySearchBuilderFactory.newSearchBuilder(any(), any(), any())).thenReturn(mySearchBuilder); when(mySearchBuilderFactory.newSearchBuilder(any(), any(), any())).thenReturn(mySearchBuilder);
SearchParameterMap params = new SearchParameterMap(); SearchParameterMap params = new SearchParameterMap();
params.setLoadSynchronous(true);
params.add("name", new StringParam("ANAME"));
params.setCount(10);
params.setOffset(10); params.setOffset(10);
params.setSearchTotalMode(SearchTotalModeEnum.ACCURATE); params.setCount(10);
List<ResourcePersistentId> pids = createPidSequence(30); mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
when(mySearchBuilder.createCountQuery(same(params), any(String.class),nullable(RequestDetails.class), nullable(RequestPartitionId.class))).thenReturn(20L);
when(mySearchBuilder.createQuery(same(params), any(), nullable(RequestDetails.class), nullable(RequestPartitionId.class))).thenReturn(new ResultIterator(pids.subList(10, 20).iterator()));
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(any(Collection.class), any(Collection.class), any(List.class), anyBoolean(), any()); verify(mySynchronousSearchSvc).executeQuery(any(), any(), any(), any(), any(), any());
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNull(result.getUuid());
assertEquals(20, result.size().intValue());
List<IBaseResource> resources = result.getResources(0, 10);
assertEquals(10, resources.size());
assertEquals("20", resources.get(0).getIdElement().getValueAsString());
} }
@Test @Test
public void testSynchronousSearchUpTo() { public void testSynchronousSearchUpTo() {
when(mySearchBuilderFactory.newSearchBuilder(any(), any(), any())).thenReturn(mySearchBuilder); when(mySearchBuilderFactory.newSearchBuilder(any(), any(), any())).thenReturn(mySearchBuilder);
when(myTxManager.getTransaction(any())).thenReturn(mock(TransactionStatus.class));
int loadUpto = 30;
SearchParameterMap params = new SearchParameterMap(); SearchParameterMap params = new SearchParameterMap();
params.setLoadSynchronousUpTo(100); CacheControlDirective cacheControlDirective = new CacheControlDirective().setMaxResults(loadUpto).setNoStore(true);
params.add("name", new StringParam("ANAME"));
List<ResourcePersistentId> pids = createPidSequence(800); mySvc.registerSearch(myCallingDao, params, "Patient", cacheControlDirective, null, RequestPartitionId.allPartitions());
when(mySearchBuilder.createQuery(same(params), any(), nullable(RequestDetails.class), nullable(RequestPartitionId.class))).thenReturn(new ResultIterator(pids.iterator()));
pids = createPidSequence(110); verify(mySynchronousSearchSvc).executeQuery(any(), any(), any(), any(), eq(30), any());
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(eq(pids), any(Collection.class), any(List.class), anyBoolean(), nullable(RequestDetails.class));
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNull(result.getUuid());
assertEquals(100, Objects.requireNonNull(result.size()).intValue());
List<IBaseResource> resources = result.getResources(0, 10000);
assertEquals(100, resources.size());
assertEquals("10", resources.get(0).getIdElement().getValueAsString());
assertEquals("109", resources.get(99).getIdElement().getValueAsString());
} }
/** /**
@ -704,51 +625,6 @@ public class SearchCoordinatorSvcImplTest {
} }
} }
public static class ResultIterator extends BaseIterator<ResourcePersistentId> implements IResultIterator {
private final Iterator<ResourcePersistentId> myWrap;
private int myCount;
ResultIterator(Iterator<ResourcePersistentId> theWrap) {
myWrap = theWrap;
}
@Override
public boolean hasNext() {
return myWrap.hasNext();
}
@Override
public ResourcePersistentId next() {
myCount++;
return myWrap.next();
}
@Override
public int getSkippedCount() {
return 0;
}
@Override
public int getNonSkippedCount() {
return myCount;
}
@Override
public Collection<ResourcePersistentId> getNextResultBatch(long theBatchSize) {
Collection<ResourcePersistentId> batch = new ArrayList<>();
while (this.hasNext() && batch.size() < theBatchSize) {
batch.add(this.next());
}
return batch;
}
@Override
public void close() {
// nothing
}
}
/** /**
* THIS CLASS IS FOR UNIT TESTS ONLY - It is delioberately inefficient * THIS CLASS IS FOR UNIT TESTS ONLY - It is delioberately inefficient
* and keeps things in memory. * and keeps things in memory.

View File

@ -0,0 +1,110 @@
package ca.uhn.fhir.jpa.search;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.transaction.TransactionStatus;
import java.util.Collection;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class SynchronousSearchSvcImplTest extends BaseSearchSvc{
@InjectMocks
private SynchronousSearchSvcImpl mySynchronousSearchSvc;
@BeforeEach
public void before() {
mySynchronousSearchSvc.setContext(ourCtx);
}
@Test
public void testSynchronousSearch() {
when(mySearchBuilderFactory.newSearchBuilder(any(), any(), any())).thenReturn(mySearchBuilder);
when(myTxManager.getTransaction(any())).thenReturn(mock(TransactionStatus.class));
SearchParameterMap params = new SearchParameterMap();
List<ResourcePersistentId> pids = createPidSequence(800);
when(mySearchBuilder.createQuery(same(params), any(), any(), nullable(RequestPartitionId.class))).thenReturn(new BaseSearchSvc.ResultIterator(pids.iterator()));
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(any(Collection.class), any(Collection.class), any(List.class), anyBoolean(), any());
IBundleProvider result = mySynchronousSearchSvc.executeQuery( "Patient", params, RequestPartitionId.allPartitions());
assertNull(result.getUuid());
assertFalse(result.isEmpty());
List<IBaseResource> resources = result.getResources(0, 1000);
assertEquals(790, resources.size());
assertEquals("10", resources.get(0).getIdElement().getValueAsString());
assertEquals("799", resources.get(789).getIdElement().getValueAsString());
}
@Test
public void testSynchronousSearchWithOffset() {
when(mySearchBuilderFactory.newSearchBuilder(any(), any(), any())).thenReturn(mySearchBuilder);
SearchParameterMap params = new SearchParameterMap();
params.setCount(10);
params.setOffset(10);
params.setSearchTotalMode(SearchTotalModeEnum.ACCURATE);
List<ResourcePersistentId> pids = createPidSequence(30);
when(mySearchBuilder.createCountQuery(same(params), any(String.class),nullable(RequestDetails.class), nullable(RequestPartitionId.class))).thenReturn(20L);
when(mySearchBuilder.createQuery(same(params), any(), nullable(RequestDetails.class), nullable(RequestPartitionId.class))).thenReturn(new BaseSearchSvc.ResultIterator(pids.subList(10, 20).iterator()));
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(any(Collection.class), any(Collection.class), any(List.class), anyBoolean(), any());
IBundleProvider result = mySynchronousSearchSvc.executeQuery("Patient", params, RequestPartitionId.allPartitions());
List<IBaseResource> resources = result.getResources(0, 1000);
assertEquals(10, resources.size());
assertEquals("20", resources.get(0).getIdElement().getValueAsString());
}
@Test
public void testSynchronousSearchUpTo() {
when(mySearchBuilderFactory.newSearchBuilder(any(), any(), any())).thenReturn(mySearchBuilder);
when(myTxManager.getTransaction(any())).thenReturn(mock(TransactionStatus.class));
when(myDaoConfig.getDefaultTotalMode()).thenReturn(null);
SearchParameterMap params = new SearchParameterMap();
params.setLoadSynchronousUpTo(100);
List<ResourcePersistentId> pids = createPidSequence(800);
when(mySearchBuilder.createQuery(same(params), any(), nullable(RequestDetails.class), nullable(RequestPartitionId.class))).thenReturn(new BaseSearchSvc.ResultIterator(pids.iterator()));
pids = createPidSequence(110);
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(eq(pids), any(Collection.class), any(List.class), anyBoolean(), nullable(RequestDetails.class));
IBundleProvider result = mySynchronousSearchSvc.executeQuery("Patient", params, RequestPartitionId.allPartitions());
List<IBaseResource> resources = result.getResources(0, 1000);
assertEquals(100, resources.size());
assertEquals("10", resources.get(0).getIdElement().getValueAsString());
assertEquals("109", resources.get(99).getIdElement().getValueAsString());
}
}

View File

@ -2,13 +2,15 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test; import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc; import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl; import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.util.ForceSynchronousSearchInterceptor;
import ca.uhn.fhir.rest.annotation.Create; import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam; import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update; import ca.uhn.fhir.rest.annotation.Update;
@ -39,7 +41,10 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -51,6 +56,7 @@ import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
/** /**
@ -70,13 +76,14 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
private static final List<Patient> ourUpdatedPatients = Collections.synchronizedList(Lists.newArrayList()); private static final List<Patient> ourUpdatedPatients = Collections.synchronizedList(Lists.newArrayList());
private static final List<String> ourContentTypes = Collections.synchronizedList(Lists.newArrayList()); private static final List<String> ourContentTypes = Collections.synchronizedList(Lists.newArrayList());
private final List<IIdType> mySubscriptionIds = Collections.synchronizedList(Lists.newArrayList()); private final List<IIdType> mySubscriptionIds = Collections.synchronizedList(Lists.newArrayList());
@Autowired @Autowired
private SubscriptionTestUtil mySubscriptionTestUtil; private SubscriptionTestUtil mySubscriptionTestUtil;
@Autowired @Autowired
private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc; private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc;
@Autowired @Autowired
private ISchedulerService mySchedulerService; private ISchedulerService mySchedulerService;
@Autowired
private IInterceptorService myInterceptorService;
@AfterEach @AfterEach
public void afterUnregisterRestHookListener() { public void afterUnregisterRestHookListener() {
@ -269,14 +276,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
String payload = "application/fhir+json"; String payload = "application/fhir+json";
IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement(); IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement();
// Create lots createPatientsAndWait(10);
for (int i = 0; i < 10; i++) {
Patient p = new Patient();
p.setId("P" + i);
p.addName().setFamily("P" + i);
ourClient.update().resource(p).execute();
}
waitForSize(10, ourUpdatedPatients);
// Use multiple strings // Use multiple strings
beforeReset(); beforeReset();
@ -316,13 +316,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement(); IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement();
// Create lots // Create lots
for (int i = 0; i < 10; i++) { createPatientsAndWait(10);
Patient p = new Patient();
p.setId("P" + i);
p.addName().setFamily("P" + i);
ourClient.update().resource(p).execute();
}
waitForSize(10, ourUpdatedPatients);
// Use a single // Use a single
beforeReset(); beforeReset();
@ -335,6 +329,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
String responseValue = response.getParameter().get(0).getValue().primitiveValue(); String responseValue = response.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
mySubscriptionTriggeringSvc.runDeliveryPass();
mySubscriptionTriggeringSvc.runDeliveryPass(); mySubscriptionTriggeringSvc.runDeliveryPass();
waitForSize(0, ourCreatedPatients); waitForSize(0, ourCreatedPatients);
@ -494,6 +489,97 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
} }
@Nested
@DisplayName("Testing subscription triggering in synchronous query mode")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TestSubscriptionTriggeringInSynchronousQueryMode{
ForceSynchronousSearchInterceptor forceSynchronousSearchInterceptor = new ForceSynchronousSearchInterceptor();
@BeforeAll
public void beforeAllTests(){
myInterceptorService.registerInterceptor(forceSynchronousSearchInterceptor);
}
@AfterAll
public void afterAllTests(){
myInterceptorService.unregisterInterceptor(forceSynchronousSearchInterceptor);
}
@Test
public void testTriggerSubscriptionInSynchronousQueryMode() throws Exception {
((SubscriptionTriggeringSvcImpl)mySubscriptionTriggeringSvc).setMaxSubmitPerPass(10);
String payload = "application/fhir+json";
IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement();
int numberOfPatient = 15;
// Create lots
createPatientsAndWait(numberOfPatient);
List<String> submittedPatientIds = ourUpdatedPatients.stream().map(patient -> patient.getId()).collect(Collectors.toList());
// Use a trigger subscription
beforeReset();
Parameters response = ourClient
.operation()
.onInstance(sub2id)
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
.withParameter(Parameters.class, ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_SEARCH_URL, new StringType("Patient?"))
.execute();
mySubscriptionTriggeringSvc.runDeliveryPass();
mySubscriptionTriggeringSvc.runDeliveryPass();
mySubscriptionTriggeringSvc.runDeliveryPass();
waitForSize(0, ourCreatedPatients);
waitForSize(numberOfPatient, ourUpdatedPatients);
List<String> resubmittedPatientIds = ourUpdatedPatients.stream().map(patient -> patient.getId()).collect(Collectors.toList());
assertTrue(resubmittedPatientIds.size() == submittedPatientIds.size());
assertTrue(resubmittedPatientIds.containsAll(submittedPatientIds));
}
@Test
public void testTriggerSubscriptionInSynchronousQueryModeWithOffset() throws Exception {
((SubscriptionTriggeringSvcImpl)mySubscriptionTriggeringSvc).setMaxSubmitPerPass(10);
ForceSynchronousSearchInterceptor forceSynchronousSearchInterceptor = new ForceSynchronousSearchInterceptor();
String payload = "application/fhir+json";
IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement();
int numberOfPatient = 15;
int offset = 5;
// Create lots
createPatientsAndWait(numberOfPatient);
List<String> submittedPatientIds = ourUpdatedPatients.stream().map(patient -> patient.getId()).collect(Collectors.toList());
List<String> expectedPatientIds = submittedPatientIds.subList(offset, submittedPatientIds.size());
// Use a trigger subscription
beforeReset();
Parameters response = ourClient
.operation()
.onInstance(sub2id)
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
.withParameter(Parameters.class, ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_SEARCH_URL, new StringType("Patient?_offset=" + offset))
.execute();
mySubscriptionTriggeringSvc.runDeliveryPass();
mySubscriptionTriggeringSvc.runDeliveryPass();
mySubscriptionTriggeringSvc.runDeliveryPass();
waitForSize(0, ourCreatedPatients);
waitForSize(numberOfPatient - offset, ourUpdatedPatients);
List<String> resubmittedPatientIds = ourUpdatedPatients.stream().map(patient -> patient.getId()).collect(Collectors.toList());
assertTrue(resubmittedPatientIds.size() == expectedPatientIds.size());
assertTrue(resubmittedPatientIds.containsAll(expectedPatientIds));
}
}
@Override @Override
protected boolean shouldLogClient() { protected boolean shouldLogClient() {
return false; return false;
@ -581,4 +667,16 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
JettyUtil.closeServer(ourListenerServer); JettyUtil.closeServer(ourListenerServer);
} }
private void createPatientsAndWait(int numberOfPatient) {
for (int i = 0; i < numberOfPatient; i++) {
Patient p = new Patient();
p.setId("P" + i);
p.addName().setFamily("P" + i);
ourClient.update().resource(p).execute();
}
waitForSize(numberOfPatient, ourUpdatedPatients);
}
} }

View File

@ -0,0 +1,14 @@
package ca.uhn.fhir.jpa.util;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
public class ForceSynchronousSearchInterceptor {
@Hook(Pointcut.STORAGE_PRESEARCH_REGISTERED)
public void storagePreSearchRegistered(SearchParameterMap theMap) {
theMap.setLoadSynchronous(true);
}
}

View File

@ -0,0 +1,11 @@
package ca.uhn.fhir.jpa.api.svc;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
public interface ISearchSvc {
IBundleProvider executeQuery(String theResourceType, SearchParameterMap theSearchParameterMap, RequestPartitionId theRequestPartitionId);
}