Merge branch 'master' of github.com:jamesagnew/hapi-fhir
This commit is contained in:
commit
ed78a5f06d
|
@ -17,5 +17,3 @@ http://hapi.fhir.org/
|
||||||
This project is Open Source, licensed under the Apache Software License 2.0.
|
This project is Open Source, licensed under the Apache Software License 2.0.
|
||||||
|
|
||||||
Please see [this wiki page](https://github.com/jamesagnew/hapi-fhir/wiki/Getting-Help) for information on where to get help with HAPI FHIR. Please see [Smile CDR](https://smilecdr.com) for information on commercial support.
|
Please see [this wiki page](https://github.com/jamesagnew/hapi-fhir/wiki/Getting-Help) for information on where to get help with HAPI FHIR. Please see [Smile CDR](https://smilecdr.com) for information on commercial support.
|
||||||
|
|
||||||
---
|
|
||||||
|
|
|
@ -1,7 +1,4 @@
|
||||||
# Starter pipeline
|
# HAPI FHIR Build Pipeline
|
||||||
# Start with a minimal pipeline that you can customize to build and deploy your code.
|
|
||||||
# Add steps that build, run tests, deploy, and more:
|
|
||||||
# https://aka.ms/yaml
|
|
||||||
|
|
||||||
variables:
|
variables:
|
||||||
MAVEN_CACHE_FOLDER: $(Pipeline.Workspace)/.m2/repository
|
MAVEN_CACHE_FOLDER: $(Pipeline.Workspace)/.m2/repository
|
||||||
|
@ -13,7 +10,6 @@ trigger:
|
||||||
pool:
|
pool:
|
||||||
vmImage: 'ubuntu-latest'
|
vmImage: 'ubuntu-latest'
|
||||||
|
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
- job: Build
|
- job: Build
|
||||||
timeoutInMinutes: 360
|
timeoutInMinutes: 360
|
||||||
|
@ -23,37 +19,37 @@ jobs:
|
||||||
key: maven
|
key: maven
|
||||||
path: $(MAVEN_CACHE_FOLDER)
|
path: $(MAVEN_CACHE_FOLDER)
|
||||||
displayName: Cache Maven local repo
|
displayName: Cache Maven local repo
|
||||||
|
|
||||||
- task: Maven@3
|
- task: Maven@3
|
||||||
inputs:
|
inputs:
|
||||||
#mavenPomFile: 'pom.xml'
|
goals: 'clean install'
|
||||||
goals: 'clean install' # Optional
|
# These are Maven CLI options (and show up in the build logs) - "-nsu"=Don't update snapshots. We can remove this when Maven OSS is more healthy
|
||||||
options: '-P ALLMODULES,JACOCO'
|
options: '-P ALLMODULES,JACOCO -nsu'
|
||||||
#publishJUnitResults: true
|
# These are JVM options (and don't show up in the build logs)
|
||||||
#testResultsFiles: '**/surefire-reports/TEST-*.xml' # Required when publishJUnitResults == True
|
mavenOptions: '-Xmx2048m $(MAVEN_OPTS) -Dorg.slf4j.simpleLogger.showDateTime=true -Dorg.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss,SSS -Duser.timezone=America/Toronto'
|
||||||
#testRunTitle: # Optional
|
|
||||||
#codeCoverageToolOption: 'None' # Optional. Options: none, cobertura, jaCoCo. Enabling code coverage inserts the `clean` goal into the Maven goals list when Maven runs.
|
|
||||||
#codeCoverageClassFilter: # Optional. Comma-separated list of filters to include or exclude classes from collecting code coverage. For example: +:com.*,+:org.*,-:my.app*.*
|
|
||||||
#codeCoverageClassFilesDirectories: # Optional
|
|
||||||
#codeCoverageSourceDirectories: # Optional
|
|
||||||
#codeCoverageFailIfEmpty: false # Optional
|
|
||||||
#javaHomeOption: 'JDKVersion' # Options: jDKVersion, path
|
|
||||||
#jdkVersionOption: 'default' # Optional. Options: default, 1.11, 1.10, 1.9, 1.8, 1.7, 1.6
|
|
||||||
#jdkDirectory: # Required when javaHomeOption == Path
|
|
||||||
#jdkArchitectureOption: 'x64' # Optional. Options: x86, x64
|
|
||||||
#mavenVersionOption: 'Default' # Options: default, path
|
|
||||||
#mavenDirectory: # Required when mavenVersionOption == Path
|
|
||||||
#mavenSetM2Home: false # Required when mavenVersionOption == Path
|
|
||||||
mavenOptions: '-Xmx2048m $(MAVEN_OPTS) -Dorg.slf4j.simpleLogger.showDateTime=true -Dorg.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss,SSS' # Optional
|
|
||||||
#mavenAuthenticateFeed: false
|
|
||||||
#effectivePomSkip: false
|
|
||||||
#sonarQubeRunAnalysis: false
|
|
||||||
#sqMavenPluginVersionChoice: 'latest' # Required when sonarQubeRunAnalysis == True# Options: latest, pom
|
|
||||||
#checkStyleRunAnalysis: false # Optional
|
|
||||||
#pmdRunAnalysis: false # Optional
|
|
||||||
#findBugsRunAnalysis: false # Optional
|
|
||||||
|
|
||||||
- script: bash <(curl https://codecov.io/bash) -t $(CODECOV_TOKEN)
|
- script: bash <(curl https://codecov.io/bash) -t $(CODECOV_TOKEN)
|
||||||
displayName: 'codecov'
|
displayName: 'codecov'
|
||||||
|
|
||||||
|
|
||||||
|
# Potential Additional Maven3 Options:
|
||||||
|
#publishJUnitResults: true
|
||||||
|
#testResultsFiles: '**/surefire-reports/TEST-*.xml' # Required when publishJUnitResults == True
|
||||||
|
#testRunTitle: # Optional
|
||||||
|
#codeCoverageToolOption: 'None' # Optional. Options: none, cobertura, jaCoCo. Enabling code coverage inserts the `clean` goal into the Maven goals list when Maven runs.
|
||||||
|
#codeCoverageClassFilter: # Optional. Comma-separated list of filters to include or exclude classes from collecting code coverage. For example: +:com.*,+:org.*,-:my.app*.*
|
||||||
|
#codeCoverageClassFilesDirectories: # Optional
|
||||||
|
#codeCoverageSourceDirectories: # Optional
|
||||||
|
#codeCoverageFailIfEmpty: false # Optional
|
||||||
|
#javaHomeOption: 'JDKVersion' # Options: jDKVersion, path
|
||||||
|
#jdkVersionOption: 'default' # Optional. Options: default, 1.11, 1.10, 1.9, 1.8, 1.7, 1.6
|
||||||
|
#jdkDirectory: # Required when javaHomeOption == Path
|
||||||
|
#jdkArchitectureOption: 'x64' # Optional. Options: x86, x64
|
||||||
|
#mavenVersionOption: 'Default' # Options: default, path
|
||||||
|
#mavenDirectory: # Required when mavenVersionOption == Path
|
||||||
|
#mavenSetM2Home: false # Required when mavenVersionOption == Path
|
||||||
|
#mavenAuthenticateFeed: false
|
||||||
|
#effectivePomSkip: false
|
||||||
|
#sonarQubeRunAnalysis: false
|
||||||
|
#sqMavenPluginVersionChoice: 'latest' # Required when sonarQubeRunAnalysis == True# Options: latest, pom
|
||||||
|
#checkStyleRunAnalysis: false # Optional
|
||||||
|
#pmdRunAnalysis: false # Optional
|
||||||
|
#findBugsRunAnalysis: false # Optional
|
||||||
|
|
|
@ -53,21 +53,6 @@ public class Search implements ICachedSearchDetails, Serializable {
|
||||||
private static final int FAILURE_MESSAGE_LENGTH = 500;
|
private static final int FAILURE_MESSAGE_LENGTH = 500;
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
private static final Logger ourLog = LoggerFactory.getLogger(Search.class);
|
private static final Logger ourLog = LoggerFactory.getLogger(Search.class);
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return new ToStringBuilder(this)
|
|
||||||
.append("myLastUpdatedHigh", myLastUpdatedHigh)
|
|
||||||
.append("myLastUpdatedLow", myLastUpdatedLow)
|
|
||||||
.append("myNumFound", myNumFound)
|
|
||||||
.append("myNumBlocked", myNumBlocked)
|
|
||||||
.append("myStatus", myStatus)
|
|
||||||
.append("myTotalCount", myTotalCount)
|
|
||||||
.append("myUuid", myUuid)
|
|
||||||
.append("myVersion", myVersion)
|
|
||||||
.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Temporal(TemporalType.TIMESTAMP)
|
@Temporal(TemporalType.TIMESTAMP)
|
||||||
@Column(name = "CREATED", nullable = false, updatable = false)
|
@Column(name = "CREATED", nullable = false, updatable = false)
|
||||||
private Date myCreated;
|
private Date myCreated;
|
||||||
|
@ -139,6 +124,20 @@ public class Search implements ICachedSearchDetails, Serializable {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return new ToStringBuilder(this)
|
||||||
|
.append("myLastUpdatedHigh", myLastUpdatedHigh)
|
||||||
|
.append("myLastUpdatedLow", myLastUpdatedLow)
|
||||||
|
.append("myNumFound", myNumFound)
|
||||||
|
.append("myNumBlocked", myNumBlocked)
|
||||||
|
.append("myStatus", myStatus)
|
||||||
|
.append("myTotalCount", myTotalCount)
|
||||||
|
.append("myUuid", myUuid)
|
||||||
|
.append("myVersion", myVersion)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
|
||||||
public int getNumBlocked() {
|
public int getNumBlocked() {
|
||||||
return myNumBlocked != null ? myNumBlocked : 0;
|
return myNumBlocked != null ? myNumBlocked : 0;
|
||||||
}
|
}
|
||||||
|
@ -351,8 +350,8 @@ public class Search implements ICachedSearchDetails, Serializable {
|
||||||
return myVersion;
|
return myVersion;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SearchParameterMap getSearchParameterMap() {
|
public Optional<SearchParameterMap> getSearchParameterMap() {
|
||||||
return SerializationUtils.deserialize(mySearchParameterMap);
|
return Optional.ofNullable(mySearchParameterMap).map(t -> SerializationUtils.deserialize(mySearchParameterMap));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setSearchParameterMap(SearchParameterMap theSearchParameterMap) {
|
public void setSearchParameterMap(SearchParameterMap theSearchParameterMap) {
|
||||||
|
|
|
@ -28,6 +28,7 @@ import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
public interface ISearchCoordinatorSvc {
|
public interface ISearchCoordinatorSvc {
|
||||||
|
|
||||||
|
@ -37,4 +38,10 @@ public interface ISearchCoordinatorSvc {
|
||||||
|
|
||||||
IBundleProvider registerSearch(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective, @Nullable RequestDetails theRequestDetails);
|
IBundleProvider registerSearch(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective, @Nullable RequestDetails theRequestDetails);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch the total number of search results for the given currently executing search, if one is currently executing and
|
||||||
|
* the total is known. Will return empty otherwise
|
||||||
|
*/
|
||||||
|
Optional<Integer> getSearchTotal(String theUuid);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -191,12 +191,12 @@ public class PersistedJpaBundleProvider implements IBundleProvider {
|
||||||
if (mySearchEntity == null) {
|
if (mySearchEntity == null) {
|
||||||
ensureDependenciesInjected();
|
ensureDependenciesInjected();
|
||||||
|
|
||||||
Optional<Search> search = mySearchCacheSvc.fetchByUuid(myUuid);
|
Optional<Search> searchOpt = mySearchCacheSvc.fetchByUuid(myUuid);
|
||||||
if (!search.isPresent()) {
|
if (!searchOpt.isPresent()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
setSearchEntity(search.get());
|
setSearchEntity(searchOpt.get());
|
||||||
|
|
||||||
ourLog.trace("Retrieved search with version {} and total {}", mySearchEntity.getVersion(), mySearchEntity.getTotalCount());
|
ourLog.trace("Retrieved search with version {} and total {}", mySearchEntity.getVersion(), mySearchEntity.getTotalCount());
|
||||||
|
|
||||||
|
@ -292,10 +292,16 @@ public class PersistedJpaBundleProvider implements IBundleProvider {
|
||||||
SearchCoordinatorSvcImpl.verifySearchHasntFailedOrThrowInternalErrorException(mySearchEntity);
|
SearchCoordinatorSvcImpl.verifySearchHasntFailedOrThrowInternalErrorException(mySearchEntity);
|
||||||
|
|
||||||
Integer size = mySearchEntity.getTotalCount();
|
Integer size = mySearchEntity.getTotalCount();
|
||||||
if (size == null) {
|
if (size != null) {
|
||||||
return null;
|
return Math.max(0, size);
|
||||||
}
|
}
|
||||||
return Math.max(0, size);
|
|
||||||
|
if (mySearchEntity.getSearchType() == SearchTypeEnum.HISTORY) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return mySearchCoordinatorSvc.getSearchTotal(myUuid).orElse(null);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: Leave as protected, HSPC depends on this
|
// Note: Leave as protected, HSPC depends on this
|
||||||
|
|
|
@ -62,6 +62,8 @@ public class PersistedJpaSearchFirstPageBundleProvider extends PersistedJpaBundl
|
||||||
public List<IBaseResource> getResources(int theFromIndex, int theToIndex) {
|
public List<IBaseResource> getResources(int theFromIndex, int theToIndex) {
|
||||||
SearchCoordinatorSvcImpl.verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
|
SearchCoordinatorSvcImpl.verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
|
||||||
|
|
||||||
|
mySearchTask.awaitInitialSync();
|
||||||
|
|
||||||
ourLog.trace("Fetching search resource PIDs from task: {}", mySearchTask.getClass());
|
ourLog.trace("Fetching search resource PIDs from task: {}", mySearchTask.getClass());
|
||||||
final List<Long> pids = mySearchTask.getResourcePids(theFromIndex, theToIndex);
|
final List<Long> pids = mySearchTask.getResourcePids(theFromIndex, theToIndex);
|
||||||
ourLog.trace("Done fetching search resource PIDs");
|
ourLog.trace("Done fetching search resource PIDs");
|
||||||
|
|
|
@ -92,7 +92,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
||||||
public static final int DEFAULT_SYNC_SIZE = 250;
|
public static final int DEFAULT_SYNC_SIZE = 250;
|
||||||
|
|
||||||
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
|
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
|
||||||
private final ConcurrentHashMap<String, BaseTask> myIdToSearchTask = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>();
|
||||||
@Autowired
|
@Autowired
|
||||||
private FhirContext myContext;
|
private FhirContext myContext;
|
||||||
@Autowired
|
@Autowired
|
||||||
|
@ -151,7 +151,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancelAllActiveSearches() {
|
public void cancelAllActiveSearches() {
|
||||||
for (BaseTask next : myIdToSearchTask.values()) {
|
for (SearchTask next : myIdToSearchTask.values()) {
|
||||||
next.requestImmediateAbort();
|
next.requestImmediateAbort();
|
||||||
try {
|
try {
|
||||||
next.getCompletionLatch().await(30, TimeUnit.SECONDS);
|
next.getCompletionLatch().await(30, TimeUnit.SECONDS);
|
||||||
|
@ -171,17 +171,35 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
||||||
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
|
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
|
||||||
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
|
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
|
||||||
|
|
||||||
|
// If we're actively searching right now, don't try to do anything until at least one batch has been
|
||||||
|
// persisted in the DB
|
||||||
|
SearchTask searchTask = myIdToSearchTask.get(theUuid);
|
||||||
|
if (searchTask != null) {
|
||||||
|
searchTask.awaitInitialSync();
|
||||||
|
}
|
||||||
|
|
||||||
|
ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo);
|
||||||
|
|
||||||
Search search;
|
Search search;
|
||||||
StopWatch sw = new StopWatch();
|
StopWatch sw = new StopWatch();
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
||||||
if (myNeverUseLocalSearchForUnitTests == false) {
|
if (myNeverUseLocalSearchForUnitTests == false) {
|
||||||
BaseTask task = myIdToSearchTask.get(theUuid);
|
if (searchTask != null) {
|
||||||
if (task != null) {
|
|
||||||
ourLog.trace("Local search found");
|
ourLog.trace("Local search found");
|
||||||
List<Long> resourcePids = task.getResourcePids(theFrom, theTo);
|
List<Long> resourcePids = searchTask.getResourcePids(theFrom, theTo);
|
||||||
if (resourcePids != null) {
|
if (resourcePids != null) {
|
||||||
return resourcePids;
|
ourLog.trace("Local search returned {} pids, wanted {}-{} - Search: {}", resourcePids.size(), theFrom, theTo, searchTask.getSearch());
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Generally, if a search task is open, the fastest possible thing is to just return its results. This
|
||||||
|
* will work most of the time, but can fail if the task hit a search threshold and the client is requesting
|
||||||
|
* results beyond that threashold. In that case, we'll keep going below, since that will trigger another
|
||||||
|
* task.
|
||||||
|
*/
|
||||||
|
if ((searchTask.getSearch().getNumFound() - searchTask.getSearch().getNumBlocked()) >= theTo || resourcePids.size() == (theTo - theFrom)) {
|
||||||
|
return resourcePids;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -189,18 +207,18 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
||||||
search = mySearchCacheSvc
|
search = mySearchCacheSvc
|
||||||
.fetchByUuid(theUuid)
|
.fetchByUuid(theUuid)
|
||||||
.orElseThrow(() -> {
|
.orElseThrow(() -> {
|
||||||
ourLog.debug("Client requested unknown paging ID[{}]", theUuid);
|
ourLog.trace("Client requested unknown paging ID[{}]", theUuid);
|
||||||
String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid);
|
String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid);
|
||||||
return new ResourceGoneException(msg);
|
return new ResourceGoneException(msg);
|
||||||
});
|
});
|
||||||
|
|
||||||
verifySearchHasntFailedOrThrowInternalErrorException(search);
|
verifySearchHasntFailedOrThrowInternalErrorException(search);
|
||||||
if (search.getStatus() == SearchStatusEnum.FINISHED) {
|
if (search.getStatus() == SearchStatusEnum.FINISHED) {
|
||||||
ourLog.debug("Search entity marked as finished with {} results", search.getNumFound());
|
ourLog.trace("Search entity marked as finished with {} results", search.getNumFound());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (search.getNumFound() >= theTo) {
|
if (search.getNumFound() >= theTo) {
|
||||||
ourLog.debug("Search entity has {} results so far", search.getNumFound());
|
ourLog.trace("Search entity has {} results so far", search.getNumFound());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,11 +230,12 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
||||||
// If the search was saved in "pass complete mode" it's probably time to
|
// If the search was saved in "pass complete mode" it's probably time to
|
||||||
// start a new pass
|
// start a new pass
|
||||||
if (search.getStatus() == SearchStatusEnum.PASSCMPLET) {
|
if (search.getStatus() == SearchStatusEnum.PASSCMPLET) {
|
||||||
|
ourLog.trace("Going to try to start next search");
|
||||||
Optional<Search> newSearch = mySearchCacheSvc.tryToMarkSearchAsInProgress(search);
|
Optional<Search> newSearch = mySearchCacheSvc.tryToMarkSearchAsInProgress(search);
|
||||||
if (newSearch.isPresent()) {
|
if (newSearch.isPresent()) {
|
||||||
search = newSearch.get();
|
search = newSearch.get();
|
||||||
String resourceType = search.getResourceType();
|
String resourceType = search.getResourceType();
|
||||||
SearchParameterMap params = search.getSearchParameterMap();
|
SearchParameterMap params = search.getSearchParameterMap().orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search"));
|
||||||
IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType);
|
IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType);
|
||||||
SearchContinuationTask task = new SearchContinuationTask(search, resourceDao, params, resourceType, theRequestDetails);
|
SearchContinuationTask task = new SearchContinuationTask(search, resourceDao, params, resourceType, theRequestDetails);
|
||||||
myIdToSearchTask.put(search.getUuid(), task);
|
myIdToSearchTask.put(search.getUuid(), task);
|
||||||
|
@ -231,10 +250,11 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ourLog.trace("Finished looping");
|
||||||
|
|
||||||
List<Long> pids = mySearchResultCacheSvc.fetchResultPids(search, theFrom, theTo);
|
List<Long> pids = mySearchResultCacheSvc.fetchResultPids(search, theFrom, theTo);
|
||||||
|
|
||||||
|
ourLog.trace("Fetched {} results", pids.size());
|
||||||
|
|
||||||
return pids;
|
return pids;
|
||||||
}
|
}
|
||||||
|
@ -290,6 +310,38 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<Integer> getSearchTotal(String theUuid) {
|
||||||
|
SearchTask task = myIdToSearchTask.get(theUuid);
|
||||||
|
if (task != null) {
|
||||||
|
return Optional.ofNullable(task.awaitInitialSync());
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* In case there is no running search, if the total is listed as accurate we know one is coming
|
||||||
|
* so let's wait a bit for it to show up
|
||||||
|
*/
|
||||||
|
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
|
||||||
|
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
|
||||||
|
Optional<Search> search = mySearchCacheSvc.fetchByUuid(theUuid);
|
||||||
|
if (search.isPresent()) {
|
||||||
|
Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap();
|
||||||
|
if (searchParameterMap.isPresent() && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) {
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
if (search.isPresent()) {
|
||||||
|
verifySearchHasntFailedOrThrowInternalErrorException(search.get());
|
||||||
|
if (search.get().getTotalCount() != null) {
|
||||||
|
return Optional.of(search.get().getTotalCount());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
search = mySearchCacheSvc.fetchByUuid(theUuid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
@NotNull
|
@NotNull
|
||||||
private IBundleProvider submitSearch(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, String theQueryString) {
|
private IBundleProvider submitSearch(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, String theQueryString) {
|
||||||
StopWatch w = new StopWatch();
|
StopWatch w = new StopWatch();
|
||||||
|
@ -516,7 +568,20 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
||||||
myInterceptorBroadcaster = theInterceptorBroadcaster;
|
myInterceptorBroadcaster = theInterceptorBroadcaster;
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract class BaseTask implements Callable<Void> {
|
/**
|
||||||
|
* A search task is a Callable task that runs in
|
||||||
|
* a thread pool to handle an individual search. One instance
|
||||||
|
* is created for any requested search and runs from the
|
||||||
|
* beginning to the end of the search.
|
||||||
|
* <p>
|
||||||
|
* Understand:
|
||||||
|
* This class executes in its own thread separate from the
|
||||||
|
* web server client thread that made the request. We do that
|
||||||
|
* so that we can return to the client as soon as possible,
|
||||||
|
* but keep the search going in the background (and have
|
||||||
|
* the next page of results ready to go when the client asks).
|
||||||
|
*/
|
||||||
|
public class SearchTask implements Callable<Void> {
|
||||||
private final SearchParameterMap myParams;
|
private final SearchParameterMap myParams;
|
||||||
private final IDao myCallingDao;
|
private final IDao myCallingDao;
|
||||||
private final String myResourceType;
|
private final String myResourceType;
|
||||||
|
@ -538,7 +603,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
*/
|
*/
|
||||||
protected BaseTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequest) {
|
protected SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequest) {
|
||||||
mySearch = theSearch;
|
mySearch = theSearch;
|
||||||
myCallingDao = theCallingDao;
|
myCallingDao = theCallingDao;
|
||||||
myParams = theParams;
|
myParams = theParams;
|
||||||
|
@ -549,6 +614,29 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
||||||
myRequest = theRequest;
|
myRequest = theRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is called by the server HTTP thread, and
|
||||||
|
* will block until at least one page of results have been
|
||||||
|
* fetched from the DB, and will never block after that.
|
||||||
|
*/
|
||||||
|
Integer awaitInitialSync() {
|
||||||
|
ourLog.trace("Awaiting initial sync");
|
||||||
|
do {
|
||||||
|
try {
|
||||||
|
if (getInitialCollectionLatch().await(250, TimeUnit.MILLISECONDS)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// Shouldn't happen
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new InternalErrorException(e);
|
||||||
|
}
|
||||||
|
} while (getSearch().getStatus() == SearchStatusEnum.LOADING);
|
||||||
|
ourLog.trace("Initial sync completed");
|
||||||
|
|
||||||
|
return getSearch().getTotalCount();
|
||||||
|
}
|
||||||
|
|
||||||
protected Search getSearch() {
|
protected Search getSearch() {
|
||||||
return mySearch;
|
return mySearch;
|
||||||
}
|
}
|
||||||
|
@ -1010,7 +1098,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public class SearchContinuationTask extends BaseTask {
|
public class SearchContinuationTask extends SearchTask {
|
||||||
|
|
||||||
public SearchContinuationTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequest) {
|
public SearchContinuationTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequest) {
|
||||||
super(theSearch, theCallingDao, theParams, theResourceType, theRequest);
|
super(theSearch, theCallingDao, theParams, theResourceType, theRequest);
|
||||||
|
@ -1041,51 +1129,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* A search task is a Callable task that runs in
|
|
||||||
* a thread pool to handle an individual search. One instance
|
|
||||||
* is created for any requested search and runs from the
|
|
||||||
* beginning to the end of the search.
|
|
||||||
* <p>
|
|
||||||
* Understand:
|
|
||||||
* This class executes in its own thread separate from the
|
|
||||||
* web server client thread that made the request. We do that
|
|
||||||
* so that we can return to the client as soon as possible,
|
|
||||||
* but keep the search going in the background (and have
|
|
||||||
* the next page of results ready to go when the client asks).
|
|
||||||
*/
|
|
||||||
class SearchTask extends BaseTask {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor
|
|
||||||
*/
|
|
||||||
SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails) {
|
|
||||||
super(theSearch, theCallingDao, theParams, theResourceType, theRequestDetails);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This method is called by the server HTTP thread, and
|
|
||||||
* will block until at least one page of results have been
|
|
||||||
* fetched from the DB, and will never block after that.
|
|
||||||
*/
|
|
||||||
Integer awaitInitialSync() {
|
|
||||||
ourLog.trace("Awaiting initial sync");
|
|
||||||
do {
|
|
||||||
try {
|
|
||||||
if (getInitialCollectionLatch().await(250, TimeUnit.MILLISECONDS)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// Shouldn't happen
|
|
||||||
throw new InternalErrorException(e);
|
|
||||||
}
|
|
||||||
} while (getSearch().getStatus() == SearchStatusEnum.LOADING);
|
|
||||||
ourLog.trace("Initial sync completed");
|
|
||||||
|
|
||||||
return getSearch().getTotalCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void populateSearchEntity(SearchParameterMap theParams, String theResourceType, String theSearchUuid, String theQueryString, Search theSearch) {
|
public static void populateSearchEntity(SearchParameterMap theParams, String theResourceType, String theSearchUuid, String theQueryString, Search theSearch) {
|
||||||
theSearch.setDeleted(false);
|
theSearch.setDeleted(false);
|
||||||
|
|
|
@ -58,9 +58,6 @@ public class DatabaseSearchResultCacheSvcImpl implements ISearchResultCacheSvc {
|
||||||
|
|
||||||
ourLog.debug("fetchResultPids for range {}-{} returned {} pids", theFrom, theTo, retVal.size());
|
ourLog.debug("fetchResultPids for range {}-{} returned {} pids", theFrom, theTo, retVal.size());
|
||||||
|
|
||||||
// FIXME: should we remove the blocked number from this message?
|
|
||||||
Validate.isTrue((theSearch.getNumFound() - theSearch.getNumBlocked()) < theTo || retVal.size() == (theTo - theFrom), "Failed to find results in cache, requested %d - %d and got %d with total found=%d and blocked %s", theFrom, theTo, retVal.size(), theSearch.getNumFound(), theSearch.getNumBlocked());
|
|
||||||
|
|
||||||
return new ArrayList<>(retVal);
|
return new ArrayList<>(retVal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -367,6 +367,11 @@ public abstract class BaseJpaR4Test extends BaseJpaTest {
|
||||||
myInterceptorRegistry.registerInterceptor(myPerformanceTracingLoggingInterceptor);
|
myInterceptorRegistry.registerInterceptor(myPerformanceTracingLoggingInterceptor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void beforeUnregisterAllSubscriptions() {
|
||||||
|
mySubscriptionRegistry.unregisterAllSubscriptions();
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void beforeFlushFT() {
|
public void beforeFlushFT() {
|
||||||
runInTransaction(() -> {
|
runInTransaction(() -> {
|
||||||
|
|
|
@ -120,20 +120,24 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
|
||||||
myPatientDao.update(p);
|
myPatientDao.update(p);
|
||||||
|
|
||||||
myDaoConfig.setSearchPreFetchThresholds(Arrays.asList(20, 50, 190));
|
myDaoConfig.setSearchPreFetchThresholds(Arrays.asList(20, 50, 190));
|
||||||
|
SearchParameterMap params;
|
||||||
|
IBundleProvider results;
|
||||||
|
String uuid;
|
||||||
|
List<String> ids;
|
||||||
|
|
||||||
// Search with count only
|
// Search with count only
|
||||||
SearchParameterMap params = new SearchParameterMap();
|
params = new SearchParameterMap();
|
||||||
params.add(Patient.SP_NAME, new StringParam("FAM"));
|
params.add(Patient.SP_NAME, new StringParam("FAM"));
|
||||||
params.setSummaryMode((SummaryEnum.COUNT));
|
params.setSummaryMode((SummaryEnum.COUNT));
|
||||||
IBundleProvider results = myPatientDao.search(params);
|
results = myPatientDao.search(params);
|
||||||
String uuid = results.getUuid();
|
uuid = results.getUuid();
|
||||||
ourLog.info("** Search returned UUID: {}", uuid);
|
ourLog.info("** Search returned UUID: {}", uuid);
|
||||||
assertEquals(201, results.size().intValue());
|
assertEquals(201, results.size().intValue());
|
||||||
List<String> ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
|
ids = toUnqualifiedVersionlessIdValues(results, 0, 10, true);
|
||||||
assertThat(ids, empty());
|
assertThat(ids, empty());
|
||||||
assertEquals(201, myDatabaseBackedPagingProvider.retrieveResultList(null, uuid).size().intValue());
|
assertEquals(201, myDatabaseBackedPagingProvider.retrieveResultList(null, uuid).size().intValue());
|
||||||
|
|
||||||
// Seach with total explicitly requested
|
// Search with total explicitly requested
|
||||||
params = new SearchParameterMap();
|
params = new SearchParameterMap();
|
||||||
params.add(Patient.SP_NAME, new StringParam("FAM"));
|
params.add(Patient.SP_NAME, new StringParam("FAM"));
|
||||||
params.setSearchTotalMode(SearchTotalModeEnum.ACCURATE);
|
params.setSearchTotalMode(SearchTotalModeEnum.ACCURATE);
|
||||||
|
@ -213,7 +217,6 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
|
||||||
assertEquals("Patient/PT00000", ids.get(0));
|
assertEquals("Patient/PT00000", ids.get(0));
|
||||||
assertEquals("Patient/PT00009", ids.get(9));
|
assertEquals("Patient/PT00009", ids.get(9));
|
||||||
|
|
||||||
await().until(() -> myDatabaseBackedPagingProvider.retrieveResultList(null, uuid).size() != null);
|
|
||||||
results = myDatabaseBackedPagingProvider.retrieveResultList(null, uuid);
|
results = myDatabaseBackedPagingProvider.retrieveResultList(null, uuid);
|
||||||
Integer resultsSize = results.size();
|
Integer resultsSize = results.size();
|
||||||
assertEquals(200, resultsSize.intValue());
|
assertEquals(200, resultsSize.intValue());
|
||||||
|
@ -354,6 +357,15 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
|
||||||
* 20 should be prefetched since that's the initial page size
|
* 20 should be prefetched since that's the initial page size
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
await().until(()->{
|
||||||
|
return runInTransaction(()->{
|
||||||
|
return mySearchEntityDao
|
||||||
|
.findByUuidAndFetchIncludes(uuid)
|
||||||
|
.orElseThrow(() -> new InternalErrorException(""))
|
||||||
|
.getStatus() == SearchStatusEnum.PASSCMPLET;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
runInTransaction(() -> {
|
runInTransaction(() -> {
|
||||||
Search search = mySearchEntityDao.findByUuidAndFetchIncludes(uuid).orElseThrow(() -> new InternalErrorException(""));
|
Search search = mySearchEntityDao.findByUuidAndFetchIncludes(uuid).orElseThrow(() -> new InternalErrorException(""));
|
||||||
assertEquals(20, search.getNumFound());
|
assertEquals(20, search.getNumFound());
|
||||||
|
@ -636,6 +648,10 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
|
||||||
assertEquals("Patient/PT00000", ids.get(0));
|
assertEquals("Patient/PT00000", ids.get(0));
|
||||||
assertEquals(1, ids.size());
|
assertEquals(1, ids.size());
|
||||||
|
|
||||||
|
await().until(()-> runInTransaction(()-> mySearchEntityDao
|
||||||
|
.findByUuidAndFetchIncludes(uuid).orElseThrow(() -> new InternalErrorException(""))
|
||||||
|
.getStatus() == SearchStatusEnum.FINISHED));
|
||||||
|
|
||||||
runInTransaction(() -> {
|
runInTransaction(() -> {
|
||||||
Search search = mySearchEntityDao.findByUuidAndFetchIncludes(uuid).orElseThrow(() -> new InternalErrorException(""));
|
Search search = mySearchEntityDao.findByUuidAndFetchIncludes(uuid).orElseThrow(() -> new InternalErrorException(""));
|
||||||
assertEquals(SearchStatusEnum.FINISHED, search.getStatus());
|
assertEquals(SearchStatusEnum.FINISHED, search.getStatus());
|
||||||
|
@ -778,6 +794,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
|
||||||
search.getResources(0, 20);
|
search.getResources(0, 20);
|
||||||
ourLog.info("** Done retrieving resources");
|
ourLog.info("** Done retrieving resources");
|
||||||
|
|
||||||
|
await().until(()->myCaptureQueriesListener.countSelectQueries() == 4);
|
||||||
|
|
||||||
myCaptureQueriesListener.logSelectQueriesForCurrentThread();
|
myCaptureQueriesListener.logSelectQueriesForCurrentThread();
|
||||||
assertEquals(4, myCaptureQueriesListener.countSelectQueries());
|
assertEquals(4, myCaptureQueriesListener.countSelectQueries());
|
||||||
|
|
|
@ -293,6 +293,7 @@ public class SearchCoordinatorSvcImplTest {
|
||||||
myExpectedNumberOfSearchBuildersCreated = 4;
|
myExpectedNumberOfSearchBuildersCreated = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAsyncSearchSmallResultSetSameCoordinator() {
|
public void testAsyncSearchSmallResultSetSameCoordinator() {
|
||||||
SearchParameterMap params = new SearchParameterMap();
|
SearchParameterMap params = new SearchParameterMap();
|
||||||
|
|
Loading…
Reference in New Issue