Rel 6 1 2 mergeback (#4033)

* Bump for CVE (#3856)

* Bump for CVE

* Bump spring-data version

* Fix compile

* Cut over to spring bom

* Bump to RC1

* remove RC

* do not contrain reindex for common SP updates (#3876)

* only fast-track jobs with exactly one chunk (#3879)

* Fix illegalstateexception when an exception is thrown during stream response (#3882)

* Finish up changelog, minor refactor

* reset buffer only

* Hack for some replacements

* Failure handling

* wip

* Fixed the issue (#3845)

* Fixed the issue

* Changelog modification

* Changelog modification

* Implemented seventh character extended code and the corresponding dis… (#3709)

* Implemented seventh character extended code and the corresponding display

* Modifications

* Changes on previous test according to modifications made in ICD10-CM XML file

* Subscription sending delete events being skipped (#3888)

* fixed bug and added test

* refactor

* Update for CVE (#3895)

* updated pointcuts to work as intended (#3903)

* updated pointcuts to work as intended

* added changelog

* review fixes

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-MacBook-Pro.local>

* 3904 during $delete expunge job hibernate search indexed documents are left orphaned (#3905)

* Add test and implementation

* Add changelog

* 3899 code in limits (#3901)

* Add implementation, changelog, test

* Update hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4Test.java

Co-authored-by: Ken Stevens <khstevens@gmail.com>

Co-authored-by: Ken Stevens <khstevens@gmail.com>

* 3884 overlapping searchparameter undetected rel 6 1 (#3909)

* Applying all changes from previous dev branch to current one pointing to rel_6_1

* Fixing merge conflict related to Msg.code value.

* Fixing Msg.code value.

* Making checkstyle happy.

* Making sure that all tests are passing.

* Passing all tests after fixing Msg.code

* Passing all tests.

Co-authored-by: peartree <etienne.poirier@smilecdr.com>

* 3745 - fixed NPE for bundle with duplicate conditional create resourc… (#3746)

* 3745 - fixed NPE for bundle with duplicate conditional create resources and a conditional delete

* created unit test for skip of delete operation while processing duplicating create entries

* moved unit test to FhirSystemDaoR4Test

* 3379 mdm fixes (#3906)

* added MdmLinkCreateSvcimplTest

* fixed creating mdm-link not setting the resource type correctly

* fixed a bug where ResourcePersistenceId was being duplicated instead of passed on

* Update hapi-fhir-jpaserver-mdm/src/test/java/ca/uhn/fhir/jpa/mdm/svc/MdmLinkCreateSvcImplTest.java

Change order of tests such that assertEquals takes expected value then actual value

Co-authored-by: jdar8 <69840459+jdar8@users.noreply.github.com>

* added changelog, also changed a setup function in test to beforeeach

Co-authored-by: Long Ma <long@smilecdr.com>
Co-authored-by: jdar8 <69840459+jdar8@users.noreply.github.com>

* Fix to the issue (#3855)

* Fix to the issue

* Progress

* fixed the issue

* Addressing suggestions

* add response status code to MethodOutcome

* Addressing suggestions

Co-authored-by: Ken Stevens <ken@smilecdr.com>

* Fix for caching appearing broken in batch2 for bulkexport jobs (#3912)

* Respect caching in bullk export, fix bug with completed date on empty jobs

* add changelog

* Add impl

* Add breaking test

* Complete failing test

* more broken tests

* Fix more tests'

* Fix paging bug

* Fix another brittle test

* 3915 do not collapse rules with filters (#3916)

* do not attempt to merge compartment permissions with filters

* changelog

* Rename to IT for concurrency problems

Co-authored-by: Tadgh <garygrantgraham@gmail.com>

* Version bump

* fix $mdm-submit output (#3917)

Co-authored-by: Ken Stevens <ken@smilecdr.com>

* Gl3407 bundle offset size (#3918)

* begin with failing test

* fixed

* change log

* rollback default count change and corresponding comments

Co-authored-by: Ken Stevens <ken@smilecdr.com>

* Offset interceptor now only works for external calls

* Initialize some beans (esp interceptors) later in the boot process so they don't slow down startup.

* do not reindex searchparam jobs on startup

* Fix oracle non-enterprise attempting online index add (#3925)

* 3922 delete expunge large dataset (#3923)

* lower batchsize of delete requests so that we do not get sql exceptions

* blah

* fix test

* updated tests to not fail

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-MacBook-Pro.local>

* add index

* Fix up colun grab

* Revert offset mode change

* Revert fix for null/system request details checks for reindex purposes

* Fix bug and add test for SP Validating Interceptor (#3930)

* wip

* Fix uptests

* Fix index online test

* Fix SP validating interceptor logic

* Updating version to: 6.1.1 post release.

* fix compile error

* Deploy to sonatype (#3934)

* adding sonatype profile to checkstyle module

* adding sonatype profile to tinder module

* adding sonatype profile to base pom

* adding final deployToSonatype profile

* wip

* Revert version enum

* Updating version to: 6.1.1 post release.

* Add test, changelog, and implementation

* Add backport info

* Create failing test

* Implemented the fix, fixed existing unit tests

* added changelog

* added test case for no filter, exclude 1 patient

* wip

* Add backport info

* Add info of new version

* Updating version to: 6.1.2 post release.

* bump info and backport for 6.1.2

* Bump for hapi

* Implement bug fixes, add new tests (#4022)

* Implement bug fixes, add new tests

* tidy

* Tidy

* refactor for cleaning

* More tidying

* Lower logging

* Split into nested tests, rename, add todos

* Typo

* Code review

* add backport info

* Tidy versionenum

* Fix import

* Remove files that were repackaged

Co-authored-by: JasonRoberts-smile <85363818+JasonRoberts-smile@users.noreply.github.com>
Co-authored-by: Qingyixia <106992634+Qingyixia@users.noreply.github.com>
Co-authored-by: TipzCM <leif.stawnyczy@gmail.com>
Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-MacBook-Pro.local>
Co-authored-by: Ken Stevens <khstevens@gmail.com>
Co-authored-by: Etienne Poirier <33007955+epeartree@users.noreply.github.com>
Co-authored-by: peartree <etienne.poirier@smilecdr.com>
Co-authored-by: kateryna-mironova <107507153+kateryna-mironova@users.noreply.github.com>
Co-authored-by: longma1 <32119004+longma1@users.noreply.github.com>
Co-authored-by: Long Ma <long@smilecdr.com>
Co-authored-by: jdar8 <69840459+jdar8@users.noreply.github.com>
Co-authored-by: Ken Stevens <ken@smilecdr.com>
Co-authored-by: markiantorno <markiantorno@gmail.com>
Co-authored-by: Steven Li <steven@smilecdr.com>
This commit is contained in:
Tadgh 2022-09-15 15:18:32 -07:00 committed by GitHub
parent ae3faafc9a
commit c3339f38c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 885 additions and 164 deletions

View File

@ -34,6 +34,8 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class SearchParameterUtil {
@ -98,9 +100,22 @@ public class SearchParameterUtil {
return patientSearchParam;
}
public static List<RuntimeSearchParam> getAllPatientCompartmentRuntimeSearchParams(FhirContext theFhirContext, String theResourceType) {
public static List<RuntimeSearchParam> getAllPatientCompartmentRuntimeSearchParamsForResourceType(FhirContext theFhirContext, String theResourceType) {
RuntimeResourceDefinition runtimeResourceDefinition = theFhirContext.getResourceDefinition(theResourceType);
return getAllPatientCompartmentRuntimeSearchParams(runtimeResourceDefinition);
}
public static List<RuntimeSearchParam> getAllPatientCompartmenRuntimeSearchParams(FhirContext theFhirContext) {
return theFhirContext.getResourceTypes()
.stream()
.flatMap(type -> getAllPatientCompartmentRuntimeSearchParamsForResourceType(theFhirContext, type).stream())
.collect(Collectors.toList());
}
public static Set<String> getAllResourceTypesThatAreInPatientCompartment(FhirContext theFhirContext) {
return theFhirContext.getResourceTypes().stream()
.filter(type -> getAllPatientCompartmentRuntimeSearchParamsForResourceType(theFhirContext, type).size() > 0)
.collect(Collectors.toSet());
}

View File

@ -102,7 +102,8 @@ public enum VersionEnum {
V6_0_5,
V6_1_0,
V6_1_1,
V6_2_0
V6_1_2,
V6_2_0,
;
public static VersionEnum latestVersion() {

View File

@ -0,0 +1,3 @@
---
release-date: "2022-09-12"
codename: "Unicorn"

View File

@ -1,6 +1,7 @@
---
type: add
jira: SMILE-4665
backport: 6.1.2
title: "Added support for AWS OpenSearch to Fulltext Search. If an AWS Region is configured, HAPI-FHIR will assume you intend to connect to an AWS-managed OpenSearch instance, and will use
Amazon's [DefaultAwsCredentialsProviderChain](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) to authenticate against it. If both username and password are provided, HAPI-FHIR will attempt to use them as a static credentials provider."

View File

@ -1,4 +1,5 @@
---
type: fix
issue: 4013
backport: 6.1.2
title: "In HAPI-FHIR 6.1.0, a regression was introduced into bulk export causing exports beyond the first one to fail in strange ways. This has been corrected."

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 4022
backport: 6.1.2
title: "Fixed a Group Bulk Export bug which was causing it to fail to return resources due to an incorrect search."

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 4022
backport: 6.1.2
title: "Fixed a Group Bulk Export bug in which the group members would not be expanded correctly."

View File

@ -0,0 +1,7 @@
---
type: fix
issue: 4022
backport: 6.1.2
title: "Fixed a bug in Group Bulk Export: If a group member was part of multiple groups , it was causing other groups to be included during Group Bulk Export, if the Group resource type was specified. Now, when
doing an export on a specific group, and you elect to return Group resources, only the called Group will be returned, regardless of cross-membership."

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 4022
backport: 6.1.2
title: "Fixed a bug in Group Bulk Export where the server would crash in oracle due to too many clauses."

View File

@ -35,6 +35,7 @@ import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
import ca.uhn.fhir.model.api.PagingIterator;
import ca.uhn.fhir.narrative.BaseThymeleafNarrativeGenerator;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;

View File

@ -115,87 +115,116 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
String jobId = theParams.getJobId();
RuntimeResourceDefinition def = myContext.getResourceDefinition(resourceType);
Set<ResourcePersistentId> pids = new HashSet<>();
Set<ResourcePersistentId> pids;
if (theParams.getExportStyle() == BulkDataExportOptions.ExportStyle.PATIENT) {
// Patient
if (myDaoConfig.getIndexMissingFields() == DaoConfig.IndexEnabledEnum.DISABLED) {
String errorMessage = "You attempted to start a Patient Bulk Export, but the system has `Index Missing Fields` disabled. It must be enabled for Patient Bulk Export";
ourLog.error(errorMessage);
throw new IllegalStateException(Msg.code(797) + errorMessage);
}
List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParams);
String patientSearchParam = getPatientSearchParamForCurrentResourceType(theParams.getResourceType()).getName();
for (SearchParameterMap map : maps) {
//Ensure users did not monkey with the patient compartment search parameter.
validateSearchParametersForPatient(map, theParams);
ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType(theParams);
if (!resourceType.equalsIgnoreCase("Patient")) {
map.add(patientSearchParam, new ReferenceParam().setMissing(false));
}
IResultIterator resultIterator = searchBuilder.createQuery(map,
new SearchRuntimeDetails(null, jobId),
null,
RequestPartitionId.allPartitions());
while (resultIterator.hasNext()) {
pids.add(resultIterator.next());
}
}
pids = getPidsForPatientStyleExport(theParams, resourceType, jobId, def);
} else if (theParams.getExportStyle() == BulkDataExportOptions.ExportStyle.GROUP) {
ourLog.trace("About to expand a Group Bulk Export");
// Group
if (resourceType.equalsIgnoreCase("Patient")) {
ourLog.info("Expanding Patients of a Group Bulk Export.");
return getExpandedPatientIterator(theParams);
}
Set<String> expandedMemberResourceIds = expandAllPatientPidsFromGroup(theParams);
if (ourLog.isDebugEnabled()) {
if (!expandedMemberResourceIds.isEmpty()) {
ourLog.debug("Group {} has been expanded to members:[{}]", theParams.getResourceType(), String.join(",", expandedMemberResourceIds));
}
}
//Next, let's search for the target resources, with their correct patient references, chunked.
//The results will be jammed into myReadPids
QueryChunker<String> queryChunker = new QueryChunker<>();
queryChunker.chunk(new ArrayList<>(expandedMemberResourceIds), QUERY_CHUNK_SIZE, (idChunk) -> {
queryResourceTypeWithReferencesToPatients(pids, idChunk, theParams, def);
});
pids = getPidsForGroupStyleExport(theParams, resourceType, def);
} else {
// System
List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParams);
ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType(theParams);
for (SearchParameterMap map : maps) {
// requires a transaction
IResultIterator resultIterator = searchBuilder.createQuery(map,
new SearchRuntimeDetails(null, jobId),
null,
RequestPartitionId.allPartitions());
while (resultIterator.hasNext()) {
pids.add(resultIterator.next());
}
}
pids = getPidsForSystemStyleExport(theParams, jobId, def);
}
ourLog.debug("Finished expanding resource pids to export, size is {}", pids.size());
return pids.iterator();
}
private Set<ResourcePersistentId> getPidsForPatientStyleExport(ExportPIDIteratorParameters theParams, String resourceType, String jobId, RuntimeResourceDefinition def) {
Set<ResourcePersistentId> pids = new HashSet<>();
// Patient
if (myDaoConfig.getIndexMissingFields() == DaoConfig.IndexEnabledEnum.DISABLED) {
String errorMessage = "You attempted to start a Patient Bulk Export, but the system has `Index Missing Fields` disabled. It must be enabled for Patient Bulk Export";
ourLog.error(errorMessage);
throw new IllegalStateException(Msg.code(797) + errorMessage);
}
List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParams);
String patientSearchParam = getPatientSearchParamForCurrentResourceType(theParams.getResourceType()).getName();
for (SearchParameterMap map : maps) {
//Ensure users did not monkey with the patient compartment search parameter.
validateSearchParametersForPatient(map, theParams);
ISearchBuilder searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType());
if (!resourceType.equalsIgnoreCase("Patient")) {
map.add(patientSearchParam, new ReferenceParam().setMissing(false));
}
SearchRuntimeDetails searchRuntime = new SearchRuntimeDetails(null, jobId);
IResultIterator resultIterator = searchBuilder.createQuery(map, searchRuntime, null, RequestPartitionId.allPartitions());
while (resultIterator.hasNext()) {
pids.add(resultIterator.next());
}
}
return pids;
}
private Set<ResourcePersistentId> getPidsForSystemStyleExport(ExportPIDIteratorParameters theParams, String theJobId, RuntimeResourceDefinition theDef) {
Set<ResourcePersistentId> pids = new HashSet<>();
// System
List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType(theDef, theParams);
ISearchBuilder searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType());
for (SearchParameterMap map : maps) {
// requires a transaction
IResultIterator resultIterator = searchBuilder.createQuery(map,
new SearchRuntimeDetails(null, theJobId),
null,
RequestPartitionId.allPartitions());
while (resultIterator.hasNext()) {
pids.add(resultIterator.next());
}
}
return pids;
}
private Set<ResourcePersistentId> getPidsForGroupStyleExport(ExportPIDIteratorParameters theParams, String theResourceType, RuntimeResourceDefinition theDef) {
Set<ResourcePersistentId> pids;
if (theResourceType.equalsIgnoreCase("Patient")) {
ourLog.info("Expanding Patients of a Group Bulk Export.");
pids = getExpandedPatientList(theParams);
} else if (theResourceType.equalsIgnoreCase("Group")) {
pids = getSingletonGroupList(theParams);
} else {
pids = getRelatedResourceTypePids(theParams, theDef);
}
return pids;
}
private Set<ResourcePersistentId> getRelatedResourceTypePids(ExportPIDIteratorParameters theParams, RuntimeResourceDefinition theDef) {
Set<ResourcePersistentId> pids = new HashSet<>();
Set<String> expandedMemberResourceIds = expandAllPatientPidsFromGroup(theParams);
assert expandedMemberResourceIds != null && !expandedMemberResourceIds.isEmpty();
if (ourLog.isDebugEnabled()) {
ourLog.debug("{} has been expanded to members:[{}]", theParams.getGroupId(), expandedMemberResourceIds);
}
//Next, let's search for the target resources, with their correct patient references, chunked.
//The results will be jammed into myReadPids
QueryChunker<String> queryChunker = new QueryChunker<>();
queryChunker.chunk(new ArrayList<>(expandedMemberResourceIds), QUERY_CHUNK_SIZE, (idChunk) -> {
queryResourceTypeWithReferencesToPatients(pids, idChunk, theParams, theDef);
});
return pids;
}
private Set<ResourcePersistentId> getSingletonGroupList(ExportPIDIteratorParameters theParams) {
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(theParams.getGroupId()), SystemRequestDetails.newSystemRequestAllPartitions());
ResourcePersistentId pidOrNull = myIdHelperService.getPidOrNull(RequestPartitionId.allPartitions(), group);
Set<ResourcePersistentId> pids = new HashSet<>();
pids.add(pidOrNull);
return pids;
}
/**
* Get and cache an ISearchBuilder for the given resource type this partition is responsible for.
* Get a ISearchBuilder for the given resource type this partition is responsible for.
*/
protected ISearchBuilder getSearchBuilderForLocalResourceType(ExportPIDIteratorParameters theParams) {
String resourceType = theParams.getResourceType();
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType);
RuntimeResourceDefinition def = myContext.getResourceDefinition(resourceType);
Class<? extends IBaseResource> nextTypeClass = def.getImplementingClass();
return mySearchBuilderFactory.newSearchBuilder(dao, resourceType, nextTypeClass);
protected ISearchBuilder getSearchBuilderForResourceType(String theResourceType) {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceType);
RuntimeResourceDefinition def = myContext.getResourceDefinition(theResourceType);
Class<? extends IBaseResource> typeClass = def.getImplementingClass();
return mySearchBuilderFactory.newSearchBuilder(dao, theResourceType, typeClass);
}
protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType(String theResourceType) {
@ -246,7 +275,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
* In case we are doing a Group Bulk Export and resourceType `Patient` is requested, we can just return the group members,
* possibly expanded by MDM, and don't have to go and fetch other resource DAOs.
*/
private Iterator<ResourcePersistentId> getExpandedPatientIterator(ExportPIDIteratorParameters theParameters) {
private Set<ResourcePersistentId> getExpandedPatientList(ExportPIDIteratorParameters theParameters) {
List<String> members = getMembersFromGroupWithFilter(theParameters);
List<IIdType> ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList());
ourLog.debug("While extracting patients from a group, we found {} patients.", ids.size());
@ -266,7 +295,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
});
populateMdmResourceCache(goldenPidSourcePidTuple);
}
return patientPidsToExport.iterator();
return patientPidsToExport;
}
/**
@ -275,23 +304,16 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
* @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"]
*/
private List<String> getMembersFromGroupWithFilter(ExportPIDIteratorParameters theParameters) {
RuntimeResourceDefinition def = myContext.getResourceDefinition(theParameters.getResourceType());
RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient");
List<String> pids = new ArrayList<>();
List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParameters);
maps.forEach(map -> addMembershipToGroupClause(map, theParameters.getGroupId()));
for (SearchParameterMap map : maps) {
//Ensure users did not monkey with the patient compartment search parameter.
validateSearchParametersForPatient(map, theParameters);
ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType(theParameters);
// Now, further filter the query with the group id.
HasOrListParam hasOrListParam = new HasOrListParam();
hasOrListParam.addOr(new HasParam("Group", "member", "_id", theParameters.getGroupId()));
map.add(PARAM_HAS, hasOrListParam);
ISearchBuilder searchBuilder = getSearchBuilderForResourceType("Patient");
ourLog.debug("Searching for members of group {} with job id {} with map {}", theParameters.getGroupId(), theParameters.getJobId(), map);
IResultIterator resultIterator = searchBuilder.createQuery(map,
new SearchRuntimeDetails(null, theParameters.getJobId()),
null,
@ -303,6 +325,19 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
return pids;
}
/**
* This method takes an {@link SearchParameterMap} and adds a clause to it that will filter the search results to only
* return members of the defined group.
*
* @param theMap the map to add the clause to.
* @param theGroupId the group ID to filter by.
*/
private void addMembershipToGroupClause(SearchParameterMap theMap, String theGroupId) {
HasOrListParam hasOrListParam = new HasOrListParam();
hasOrListParam.addOr(new HasParam("Group", "member", "_id", theGroupId));
theMap.add(PARAM_HAS, hasOrListParam);
}
/**
* @param thePidTuples
*/
@ -361,7 +396,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
validateSearchParametersForGroup(expandedSpMap, theParams.getResourceType());
// Fetch and cache a search builder for this resource type
ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType(theParams);
ISearchBuilder searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType());
// Now, further filter the query with patient references defined by the chunk of IDs we have.
if (PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(theParams.getResourceType())) {
@ -391,7 +426,8 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
private void filterSearchByResourceIds(List<String> idChunk, SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) {
ReferenceOrListParam orList = new ReferenceOrListParam();
idChunk.forEach(id -> orList.add(new ReferenceParam(id)));
expandedSpMap.add(getPatientSearchParamForCurrentResourceType(theParams.getResourceType()).getName(), orList);
RuntimeSearchParam patientSearchParamForCurrentResourceType = getPatientSearchParamForCurrentResourceType(theParams.getResourceType());
expandedSpMap.add(patientSearchParamForCurrentResourceType.getName(), orList);
}
/**
@ -449,7 +485,9 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
//Now manually add the members of the group (its possible even with mdm expansion that some members dont have MDM matches,
//so would be otherwise skipped
expandedIds.addAll(getMembersFromGroupWithFilter(theParams));
List<String> membersFromGroupWithFilter = getMembersFromGroupWithFilter(theParams);
ourLog.debug("Group with ID [{}] has been expanded to: {}", theParams.getGroupId(), membersFromGroupWithFilter);
expandedIds.addAll(membersFromGroupWithFilter);
return expandedIds;
}

View File

@ -53,5 +53,4 @@ public class QueryChunker<T> {
theBatchConsumer.accept(batch);
}
}
}

View File

@ -22,6 +22,9 @@ import ca.uhn.fhir.mdm.model.MdmPidTuple;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.HasOrListParam;
import ca.uhn.fhir.rest.param.HasParam;
import ch.qos.logback.classic.spi.ILoggingEvent;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Group;
@ -31,6 +34,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatcher;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
@ -48,6 +52,7 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import static ca.uhn.fhir.rest.api.Constants.PARAM_HAS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -57,6 +62,7 @@ import static org.mockito.AdditionalMatchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

View File

@ -0,0 +1,656 @@
package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.util.BulkExportUtils;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.fhir.util.SearchParameterUtil;
import com.google.common.collect.Sets;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Binary;
import org.hl7.fhir.r4.model.Coverage;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.Group;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Reference;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(BulkExportUseCaseTest.class);
@Autowired
private IBatch2JobRunner myJobRunner;
@Nested
public class SystemBulkExportTests {
//TODO add use case tests here
}
@Nested
public class PatientBulkExportTests {
@BeforeEach
public void before() {
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
}
@AfterEach
public void after() {
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.DISABLED);
}
@Test
public void testPatientExportIgnoresResourcesNotInPatientCompartment() {
Patient patient = new Patient();
patient.setId("pat-1");
myPatientDao.update(patient);
Observation obs = new Observation();
obs.setId("obs-included");
obs.setSubject(new Reference("Patient/pat-1"));
myObservationDao.update(obs);
Observation obs2 = new Observation();
obs2.setId("obs-excluded");
myObservationDao.update(obs2);
HashSet<String> types = Sets.newHashSet("Patient", "Observation");
BulkExportJobResults bulkExportJobResults = startPatientBulkExportJobAndAwaitResults(types, new HashSet<String>(), "ha");
Map<String, List<IBaseResource>> typeToResources = convertJobResultsToResources(bulkExportJobResults);
assertThat(typeToResources.get("Patient"), hasSize(1));
assertThat(typeToResources.get("Observation"), hasSize(1));
Map<String, String> typeToContents = convertJobResultsToStringContents(bulkExportJobResults);
assertThat(typeToContents.get("Observation"), containsString("obs-included"));
assertThat(typeToContents.get("Observation"), not(containsString("obs-excluded")));
}
}
@Nested
public class GroupBulkExportTests {
@Test
public void testVeryLargeGroup() {
Group group = new Group();
group.setId("Group/G");
group.setActive(true);
for (int i = 0; i < 600; i++) {
Patient patient = new Patient();
patient.setId("PING-" + i);
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
group.addMember().getEntity().setReference("Patient/PING-" + i);
Observation obs = new Observation();
obs.setId("obs-" + i);
obs.setSubject(new Reference("Patient/PING-" + i));
myClient.update().resource(obs).execute();
}
myClient.update().resource(group).execute();
HashSet<String> resourceTypes = Sets.newHashSet("Group", "Patient", "Observation");
BulkExportJobResults bulkExportJobResults = startGroupBulkExportJobAndAwaitCompletion(resourceTypes, new HashSet<>(), "G");
Map<String, List<IBaseResource>> firstMap = convertJobResultsToResources(bulkExportJobResults);
assertThat(firstMap.keySet(), hasSize(3));
assertThat(firstMap.get("Group"), hasSize(1));
assertThat(firstMap.get("Patient"), hasSize(600));
assertThat(firstMap.get("Observation"), hasSize(600));
}
@Test
public void testGroupBulkExportMembershipShouldNotExpandIntoOtherGroups() {
Patient patient = new Patient();
patient.setId("PING1");
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
Group group = new Group();
group.setId("Group/G1");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/PING1");
myClient.update().resource(group).execute();
patient = new Patient();
patient.setId("PING2");
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
group = new Group();
group.setId("Group/G2");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/PING1");
group.addMember().getEntity().setReference("Patient/PING2");
myClient.update().resource(group).execute();
HashSet<String> resourceTypes = Sets.newHashSet("Group", "Patient");
BulkExportJobResults bulkExportJobResults = startGroupBulkExportJobAndAwaitCompletion(resourceTypes, new HashSet<>(), "G1");
Map<String, List<IBaseResource>> firstMap = convertJobResultsToResources(bulkExportJobResults);
assertThat(firstMap.get("Patient"), hasSize(1));
assertThat(firstMap.get("Group"), hasSize(1));
}
@Test
public void testDifferentTypesDoNotUseCachedResults() {
Patient patient = new Patient();
patient.setId("PING1");
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
Group group = new Group();
group.setId("Group/G2");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/PING1");
myClient.update().resource(group).execute();
Observation o = new Observation();
o.setSubject(new Reference("Patient/PING1"));
o.setId("obs-included");
myClient.update().resource(o).execute();
Coverage c = new Coverage();
c.setBeneficiary(new Reference("Patient/PING1"));
c.setId("cov-included");
myClient.update().resource(c).execute();
HashSet<String> resourceTypes = Sets.newHashSet("Observation", "Patient");
BulkExportJobResults bulkExportJobResults = startGroupBulkExportJobAndAwaitCompletion(resourceTypes, new HashSet<>(), "G2");
Map<String, List<IBaseResource>> firstMap = convertJobResultsToResources(bulkExportJobResults);
assertThat(firstMap.get("Patient"), hasSize(1));
assertThat(firstMap.get("Observation"), hasSize(1));
HashSet<String> otherResourceTypes = Sets.newHashSet("Coverage", "Patient");
BulkExportJobResults altBulkExportResults = startGroupBulkExportJobAndAwaitCompletion(otherResourceTypes, new HashSet<>(), "G2");
Map<String, List<IBaseResource>> secondMap = convertJobResultsToResources(altBulkExportResults);
assertThat(secondMap.get("Patient"), hasSize(1));
assertThat(secondMap.get("Coverage"), hasSize(1));
}
@Test
public void testGroupBulkExportNotInGroup_DoeNotShowUp() {
// Create some resources
Patient patient = new Patient();
patient.setId("PING1");
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
patient = new Patient();
patient.setId("PING2");
patient.setGender(Enumerations.AdministrativeGender.MALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
patient = new Patient();
patient.setId("PNING3");
patient.setGender(Enumerations.AdministrativeGender.MALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
Group group = new Group();
group.setId("Group/G2");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/PING1");
group.addMember().getEntity().setReference("Patient/PING2");
myClient.update().resource(group).execute();
verifyBulkExportResults("G2", new HashSet<>(), List.of("\"PING1\"", "\"PING2\""), Collections.singletonList("\"PNING3\""));
}
@Test
public void testTwoConsecutiveBulkExports() {
// Create some resources
Patient patient = new Patient();
patient.setId("PING1");
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
Group group = new Group();
group.setId("Group/G2");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/PING1");
myClient.update().resource(group).execute();
myCaptureQueriesListener.clear();
verifyBulkExportResults("G2", new HashSet<>(), List.of("\"PING1\""), Collections.singletonList("\"PNING3\""));
myCaptureQueriesListener.logSelectQueries();
ourLog.error("************");
myCaptureQueriesListener.clear();
try {
verifyBulkExportResults("G2", new HashSet<>(), List.of("\"PING1\""), Collections.singletonList("\"PNING3\""));
} finally {
myCaptureQueriesListener.logSelectQueries();
}
}
@Test
public void testGroupExportPatientAndOtherResources() {
Patient patient = new Patient();
patient.setId("PING1");
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
//Other patient not in group
Patient patient2 = new Patient();
patient2.setId("POG2");
patient2.setGender(Enumerations.AdministrativeGender.FEMALE);
patient2.setActive(true);
myClient.update().resource(patient2).execute();
Group group = new Group();
group.setId("Group/G2");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/PING1");
myClient.update().resource(group).execute();
Observation o = new Observation();
o.setSubject(new Reference("Patient/PING1"));
o.setId("obs-included");
myClient.update().resource(o).execute();
Observation o2 = new Observation();
o2.setSubject(new Reference("Patient/POG2"));
o2.setId("obs-excluded");
myClient.update().resource(o2).execute();
HashSet<String> resourceTypes = Sets.newHashSet("Observation", "Patient");
BulkExportJobResults bulkExportJobResults = startGroupBulkExportJobAndAwaitCompletion(resourceTypes, new HashSet<>(), "G2");
Map<String, List<IBaseResource>> typeToResources = convertJobResultsToResources(bulkExportJobResults);
assertThat(typeToResources.get("Patient"), hasSize(1));
assertThat(typeToResources.get("Observation"), hasSize(1));
Map<String, String> typeToContents = convertJobResultsToStringContents(bulkExportJobResults);
assertThat(typeToContents.get("Patient"), containsString("PING1"));
assertThat(typeToContents.get("Patient"), not(containsString("POG2")));
assertThat(typeToContents.get("Observation"), containsString("obs-included"));
assertThat(typeToContents.get("Observation"), not(containsString("obs-excluded")));
}
@Test
public void testGroupBulkExportWithTypeFilter() {
// Create some resources
Patient patient = new Patient();
patient.setId("PF");
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
patient = new Patient();
patient.setId("PM");
patient.setGender(Enumerations.AdministrativeGender.MALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
Group group = new Group();
group.setId("Group/G");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/PF");
group.addMember().getEntity().setReference("Patient/PM");
myClient.update().resource(group).execute();
//Create an observation for each patient
Observation femaleObs = new Observation();
femaleObs.setSubject(new Reference("Patient/PF"));
femaleObs.setId("obs-female");
myClient.update().resource(femaleObs).execute();
Observation maleObs = new Observation();
maleObs.setSubject(new Reference("Patient/PM"));
maleObs.setId("obs-male");
myClient.update().resource(maleObs).execute();
HashSet<String> resourceTypes = Sets.newHashSet("Observation", "Patient");
HashSet<String> filters = Sets.newHashSet("Patient?gender=female");
BulkExportJobResults results = startGroupBulkExportJobAndAwaitCompletion(resourceTypes, filters, "G");
Map<String, List<IBaseResource>> stringListMap = convertJobResultsToResources(results);
assertThat(stringListMap.get("Observation"), hasSize(1));
assertThat(stringListMap.get("Patient"), hasSize(1));
Map<String, String> typeToContents = convertJobResultsToStringContents(results);
assertThat(typeToContents.get("Observation"), not(containsString("obs-male")));
assertThat(typeToContents.get("Observation"), containsString("obs-female"));
}
@Test
public void testGroupExportOmitResourceTypesFetchesAll() {
// Create some resources
Patient patient = new Patient();
patient.setId("PF");
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
patient = new Patient();
patient.setId("PM");
patient.setGender(Enumerations.AdministrativeGender.MALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
Group group = new Group();
group.setId("Group/G");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/PF");
group.addMember().getEntity().setReference("Patient/PM");
myClient.update().resource(group).execute();
//Create an observation for each patient
Observation femaleObs = new Observation();
femaleObs.setSubject(new Reference("Patient/PF"));
femaleObs.setId("obs-female");
myClient.update().resource(femaleObs).execute();
Observation maleObs = new Observation();
maleObs.setSubject(new Reference("Patient/PM"));
maleObs.setId("obs-male");
myClient.update().resource(maleObs).execute();
Coverage coverage = new Coverage();
coverage.setBeneficiary(new Reference("Patient/PM"));
coverage.setId("coverage-male");
myClient.update().resource(coverage).execute();
coverage = new Coverage();
coverage.setBeneficiary(new Reference("Patient/PF"));
coverage.setId("coverage-female");
myClient.update().resource(coverage).execute();
HashSet<String> resourceTypes = Sets.newHashSet(SearchParameterUtil.getAllResourceTypesThatAreInPatientCompartment(myFhirContext));
HashSet<String> filters = Sets.newHashSet();
BulkExportJobResults results = startGroupBulkExportJobAndAwaitCompletion(resourceTypes, filters, "G");
Map<String, List<IBaseResource>> typeToResource = convertJobResultsToResources(results);
assertThat(typeToResource.keySet(), hasSize(4));
assertThat(typeToResource.get("Group"), hasSize(1));
assertThat(typeToResource.get("Observation"), hasSize(2));
assertThat(typeToResource.get("Coverage"), hasSize(2));
assertThat(typeToResource.get("Patient"), hasSize(2));
}
@Test
public void testGroupExportPatientOnly() {
Patient patient = new Patient();
patient.setId("PING1");
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
//Other patient not in group
Patient patient2 = new Patient();
patient2.setId("POG2");
patient2.setGender(Enumerations.AdministrativeGender.FEMALE);
patient2.setActive(true);
myClient.update().resource(patient2).execute();
Group group = new Group();
group.setId("Group/G2");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/PING1");
myClient.update().resource(group).execute();
HashSet<String> resourceTypes = Sets.newHashSet("Patient");
BulkExportJobResults bulkExportJobResults = startGroupBulkExportJobAndAwaitCompletion(resourceTypes, new HashSet<>(), "G2");
Map<String, List<IBaseResource>> typeToResources = convertJobResultsToResources(bulkExportJobResults);
assertThat(typeToResources.get("Patient"), hasSize(1));
Map<String, String> typeToContents = convertJobResultsToStringContents(bulkExportJobResults);
assertThat(typeToContents.get("Patient"), containsString("PING1"));
assertThat(typeToContents.get("Patient"), not(containsString("POG2")));
}
@Test
public void testGroupBulkExportMultipleResourceTypes() {
Patient patient = new Patient();
patient.setId("PING1");
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
//Other patient not in group
Patient patient2 = new Patient();
patient2.setId("POG2");
patient2.setGender(Enumerations.AdministrativeGender.FEMALE);
patient2.setActive(true);
myClient.update().resource(patient2).execute();
Group group = new Group();
group.setId("Group/G2");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/PING1");
myClient.update().resource(group).execute();
Observation o = new Observation();
o.setSubject(new Reference("Patient/PING1"));
o.setId("obs-included");
myClient.update().resource(o).execute();
Coverage coverage = new Coverage();
coverage.setBeneficiary(new Reference("Patient/PING1"));
coverage.setId("coverage-included");
myClient.update().resource(coverage).execute();
HashSet<String> resourceTypes = Sets.newHashSet("Observation", "Coverage");
BulkExportJobResults bulkExportJobResults = startGroupBulkExportJobAndAwaitCompletion(resourceTypes, new HashSet<>(), "G2");
Map<String, List<IBaseResource>> typeToResources = convertJobResultsToResources(bulkExportJobResults);
assertThat(typeToResources.get("Observation"), hasSize(1));
assertThat(typeToResources.get("Coverage"), hasSize(1));
Map<String, String> typeToContents = convertJobResultsToStringContents(bulkExportJobResults);
assertThat(typeToContents.get("Observation"), containsString("obs-included"));
assertThat(typeToContents.get("Coverage"), containsString("coverage-included"));
}
@Test
public void testGroupBulkExportOverLargeDataset() {
Patient patient = new Patient();
patient.setId("PING1");
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
myClient.update().resource(patient).execute();
//Other patient not in group
Patient patient2 = new Patient();
patient2.setId("POG2");
patient2.setGender(Enumerations.AdministrativeGender.FEMALE);
patient2.setActive(true);
myClient.update().resource(patient2).execute();
Group group = new Group();
group.setId("Group/G2");
group.setActive(true);
group.addMember().getEntity().setReference("Patient/PING1");
myClient.update().resource(group).execute();
for (int i = 0; i < 1000; i++) {
Observation o = new Observation();
o.setSubject(new Reference("Patient/PING1"));
o.setId("obs-included-" + i);
myClient.update().resource(o).execute();
}
for (int i = 0; i < 100; i++) {
Observation o = new Observation();
o.setSubject(new Reference("Patient/POG2"));
o.setId("obs-not-included-" + i);
myClient.update().resource(o).execute();
}
BulkExportJobResults bulkExportJobResults = startGroupBulkExportJobAndAwaitCompletion(Sets.newHashSet("Observation"), new HashSet<>(), "G2");
Map<String, List<IBaseResource>> typeToResources = convertJobResultsToResources(bulkExportJobResults);
assertThat(typeToResources.get("Observation"), hasSize(1000));
Map<String, String> typeToContents = convertJobResultsToStringContents(bulkExportJobResults);
assertThat(typeToContents.get("Observation"), not(containsString("not-included")));
assertThat(typeToContents.get("Observation"), containsString("obs-included-0"));
assertThat(typeToContents.get("Observation"), containsString("obs-included-999"));
}
}
private Map<String, String> convertJobResultsToStringContents(BulkExportJobResults theResults) {
Map<String, String> typeToResources = new HashMap<>();
for (Map.Entry<String, List<String>> entry : theResults.getResourceTypeToBinaryIds().entrySet()) {
typeToResources.put(entry.getKey(), "");
StringBuilder sb = new StringBuilder();
List<String> binaryIds = entry.getValue();
for (String binaryId : binaryIds) {
String contents = getBinaryContentsAsString(binaryId);
if (!contents.endsWith("\n")) {
contents = contents + "\n";
}
sb.append(contents);
}
typeToResources.put(entry.getKey(), sb.toString());
}
return typeToResources;
}
Map<String, List<IBaseResource>> convertJobResultsToResources(BulkExportJobResults theResults) {
Map<String, String> stringStringMap = convertJobResultsToStringContents(theResults);
Map<String, List<IBaseResource>> typeToResources = new HashMap<>();
stringStringMap.entrySet().forEach(entry -> typeToResources.put(entry.getKey(), convertNDJSONToResources(entry.getValue())));
return typeToResources;
}
private List<IBaseResource> convertNDJSONToResources(String theValue) {
IParser iParser = myFhirContext.newJsonParser();
return theValue.lines()
.map(iParser::parseResource)
.toList();
}
private String getBinaryContentsAsString(String theBinaryId) {
Binary binary = myBinaryDao.read(new IdType(theBinaryId));
assertEquals(Constants.CT_FHIR_NDJSON, binary.getContentType());
String contents = new String(binary.getContent(), Constants.CHARSET_UTF8);
return contents;
}
BulkExportJobResults startGroupBulkExportJobAndAwaitCompletion(HashSet<String> theResourceTypes, HashSet<String> theFilters, String theGroupId) {
return startBulkExportJobAndAwaitCompletion(BulkDataExportOptions.ExportStyle.GROUP, theResourceTypes, theFilters, theGroupId);
}
BulkExportJobResults startBulkExportJobAndAwaitCompletion(BulkDataExportOptions.ExportStyle theExportStyle, HashSet<String> theResourceTypes, HashSet<String> theFilters, String theGroupOrPatientId) {
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(theResourceTypes);
options.setFilters(theFilters);
options.setExportStyle(theExportStyle);
if (theExportStyle == BulkDataExportOptions.ExportStyle.GROUP) {
options.setGroupId(new IdType("Group", theGroupOrPatientId));
}
if (theExportStyle == BulkDataExportOptions.ExportStyle.PATIENT && theGroupOrPatientId != null) {
//TODO add support for this actual processor.
//options.setPatientId(new IdType("Patient", theGroupOrPatientId));
}
options.setOutputFormat(Constants.CT_FHIR_NDJSON);
Batch2JobStartResponse startResponse = myJobRunner.startNewJob(BulkExportUtils.createBulkExportJobParametersFromExportOptions(options));
assertNotNull(startResponse);
myBatch2JobHelper.awaitJobCompletion(startResponse.getJobId());
await().until(() -> myJobRunner.getJobInfo(startResponse.getJobId()).getReport() != null);
String report = myJobRunner.getJobInfo(startResponse.getJobId()).getReport();
BulkExportJobResults results = JsonUtil.deserialize(report, BulkExportJobResults.class);
return results;
}
private void verifyBulkExportResults(String theGroupId, HashSet<String> theFilters, List<String> theContainedList, List<String> theExcludedList) {
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient"));
options.setGroupId(new IdType("Group", theGroupId));
options.setFilters(theFilters);
options.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
options.setOutputFormat(Constants.CT_FHIR_NDJSON);
Batch2JobStartResponse startResponse = myJobRunner.startNewJob(BulkExportUtils.createBulkExportJobParametersFromExportOptions(options));
assertNotNull(startResponse);
// Run a scheduled pass to build the export
myBatch2JobHelper.awaitJobCompletion(startResponse.getJobId());
await().until(() -> myJobRunner.getJobInfo(startResponse.getJobId()).getReport() != null);
// Iterate over the files
String report = myJobRunner.getJobInfo(startResponse.getJobId()).getReport();
BulkExportJobResults results = JsonUtil.deserialize(report, BulkExportJobResults.class);
for (Map.Entry<String, List<String>> file : results.getResourceTypeToBinaryIds().entrySet()) {
List<String> binaryIds = file.getValue();
assertEquals(1, binaryIds.size());
for (String binaryId : binaryIds) {
Binary binary = myBinaryDao.read(new IdType(binaryId));
assertEquals(Constants.CT_FHIR_NDJSON, binary.getContentType());
String contents = new String(binary.getContent(), Constants.CHARSET_UTF8);
ourLog.info("Next contents for type {} :\n{}", binary.getResourceType(), contents);
for (String containedString : theContainedList) {
assertThat(contents, Matchers.containsString(containedString));
}
for (String excludedString : theExcludedList) {
assertThat(contents, not(Matchers.containsString(excludedString)));
}
}
}
}
private BulkExportJobResults startPatientBulkExportJobAndAwaitResults(HashSet<String> theTypes, HashSet<String> theFilters, String thePatientId) {
return startBulkExportJobAndAwaitCompletion(BulkDataExportOptions.ExportStyle.PATIENT, theTypes, theFilters, thePatientId);
}
}

View File

@ -41,6 +41,8 @@ import javax.persistence.EntityManager;
import java.util.List;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;

View File

@ -160,6 +160,7 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import javax.sql.DataSource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

View File

@ -64,6 +64,10 @@
<appender-ref ref="STDOUT" />
</logger>
<logger name="ca.uhn.fhir.jpa.bulk" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>
<root level="info">
<appender-ref ref="STDOUT" />
</root>

View File

@ -33,13 +33,9 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.interceptor.ResponseTerminologyTranslationSvc;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
@ -74,11 +70,10 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
BulkExportJobParameters jobParameters = theStepExecutionDetails.getParameters();
ourLog.info("Step 2 for bulk export - Expand resources");
ourLog.info("About to expand {} resource IDs into their full resource bodies.", idList.getIds().size());
// search the resources
IBundleProvider bundle = fetchAllResources(idList);
List<IBaseResource> allResources = fetchAllResources(idList);
List<IBaseResource> allResources = bundle.getAllResources();
// if necessary, expand resources
if (jobParameters.isExpandMdm()) {
@ -98,7 +93,7 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
output.setResourceType(idList.getResourceType());
theDataSink.accept(output);
ourLog.trace("Expanding of {} resources of type {} completed",
ourLog.info("Expanding of {} resources of type {} completed",
idList.getIds().size(),
idList.getResourceType());
@ -106,16 +101,17 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
return RunOutcome.SUCCESS;
}
private IBundleProvider fetchAllResources(BulkExportIdList theIds) {
private List<IBaseResource> fetchAllResources(BulkExportIdList theIds) {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theIds.getResourceType());
List<IBaseResource> resources = new ArrayList<>();
SearchParameterMap map = SearchParameterMap.newSynchronous();
TokenOrListParam ids = new TokenOrListParam();
for (Id id : theIds.getIds()) {
ids.addOr(new TokenParam(id.toPID().getAssociatedResourceId().getValue()));
String value = id.getId();
// This should be a query, but we have PIDs, and we don't have a _pid search param. TODO GGG, figure out how to make this search by pid.
resources.add(dao.readByPid(new ResourcePersistentId(value)));
}
map.add(Constants.PARAM_ID, ids);
return dao.search(map, SystemRequestDetails.forAllPartitions());
return resources;
}
private List<String> encodeToString(List<IBaseResource> theResources, BulkExportJobParameters theParameters) {

View File

@ -45,7 +45,7 @@ import java.util.List;
public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobParameters, BulkExportIdList> {
private static final Logger ourLog = LoggerFactory.getLogger(FetchResourceIdsStep.class);
public static final int MAX_IDS_TO_BATCH = 1000;
public static final int MAX_IDS_TO_BATCH = 900;
@Autowired
private IBulkExportProcessor myBulkExportProcessor;
@ -63,7 +63,6 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
providerParams.setExportStyle(params.getExportStyle());
providerParams.setGroupId(params.getGroupId());
providerParams.setExpandMdm(params.isExpandMdm());
ourLog.info("Running FetchResourceIdsStep with params: {}", providerParams);
int submissionCount = 0;
try {
@ -71,11 +70,12 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
providerParams.setResourceType(resourceType);
// filters are the filters for searching
ourLog.info("Running FetchResourceIdsStep with params: {}", providerParams);
Iterator<ResourcePersistentId> pidIterator = myBulkExportProcessor.getResourcePidIterator(providerParams);
List<Id> idsToSubmit = new ArrayList<>();
if (!pidIterator.hasNext()) {
ourLog.warn("Bulk Export generated an iterator with no results!");
ourLog.debug("Bulk Export generated an iterator with no results!");
}
while (pidIterator.hasNext()) {
ResourcePersistentId pid = pidIterator.next();

View File

@ -66,6 +66,7 @@ public class WriteBinaryStep implements IJobStepWorker<BulkExportJobParameters,
BulkExportExpandedResources expandedResources = theStepExecutionDetails.getData();
ourLog.info("Write binary step of Job Export");
ourLog.info("Writing {} resources to binary file", expandedResources.getStringifiedResources().size());
@SuppressWarnings("unchecked")
IFhirResourceDao<IBaseBinary> binaryDao = myDaoRegistry.getResourceDao("Binary");

View File

@ -81,10 +81,4 @@ public class Id implements IModelJson {
id.setResourceType(theResourceType);
return id;
}
public ResourcePersistentId toPID() {
ResourcePersistentId pid = new ResourcePersistentId(myId);
pid.setAssociatedResourceId(new IdType(myId));
return pid;
}
}

View File

@ -17,6 +17,7 @@ import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.api.IQueryParameterType;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.interceptor.ResponseTerminologyTranslationSvc;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Patient;
@ -114,19 +115,14 @@ public class ExpandResourcesStepTest {
resources.add(patient);
}
idList.setIds(ids);
IBundleProvider bundleProvider = mock(IBundleProvider.class);
StepExecutionDetails<BulkExportJobParameters, BulkExportIdList> input = createInput(
idList,
createParameters(),
instance
);
// when
when(bundleProvider.getAllResources())
.thenReturn(resources);
when(patientDao.search(any(SearchParameterMap.class), any()))
.thenReturn(bundleProvider);
ArrayList<IBaseResource> clone = new ArrayList<>(resources);
when(patientDao.readByPid(any(ResourcePersistentId.class))).thenAnswer(i -> clone.remove(0));
// test
RunOutcome outcome = mySecondStep.run(input, sink);
@ -134,28 +130,6 @@ public class ExpandResourcesStepTest {
// verify
assertEquals(RunOutcome.SUCCESS, outcome);
// search parameters
ArgumentCaptor<SearchParameterMap> captor = ArgumentCaptor.forClass(SearchParameterMap.class);
verify(patientDao)
.search(captor.capture(), any());
assertEquals(1, captor.getAllValues().size());
SearchParameterMap map = captor.getValue();
Collection<List<List<IQueryParameterType>>> values = map.values();
Set<String> idValues = new HashSet<>();
for (List<List<IQueryParameterType>> parameterTypes : values) {
for (List<IQueryParameterType> param : parameterTypes) {
for (IQueryParameterType type : param) {
String value = type.getValueAsQueryToken(myFhirContext);
idValues.add(value);
Id findingId = new Id();
findingId.setId(value);
findingId.setResourceType("Patient");
assertTrue(ids.contains(findingId));
}
}
}
assertEquals(ids.size(), idValues.size());
// data sink
ArgumentCaptor<BulkExportExpandedResources> expandedCaptor = ArgumentCaptor.forClass(BulkExportExpandedResources.class);

View File

@ -150,7 +150,8 @@ public class FetchResourceIdsStepTest {
List<ILoggingEvent> events = logCaptor.getAllValues();
assertTrue(events.get(0).getMessage().contains("Starting BatchExport job"));
assertTrue(events.get(1).getMessage().contains("Running FetchResource"));
assertTrue(events.get(2).getFormattedMessage().contains("Submitted "
assertTrue(events.get(2).getMessage().contains("Running FetchResource"));
assertTrue(events.get(3).getFormattedMessage().contains("Submitted "
+ parameters.getResourceTypes().size()
+ " groups of ids for processing"
));

View File

@ -26,6 +26,8 @@ import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.jpa.batch.log.Logs;
@ -34,6 +36,8 @@ import org.slf4j.Logger;
import javax.annotation.Nonnull;
import java.util.Date;
import java.util.Optional;
import java.util.Date;
public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();

View File

@ -32,6 +32,7 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import java.util.Date;
import java.util.Objects;
import static org.apache.commons.lang3.StringUtils.isBlank;

View File

@ -167,4 +167,5 @@ public interface IIdHelperService {
*/
Set<String> translatePidsToFhirResourceIds(Set<ResourcePersistentId> thePids);
}

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.bulk.export.model;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import java.util.Date;
import java.util.List;
@ -125,14 +126,7 @@ public class ExportPIDIteratorParameters {
@Override
public String toString() {
return new ToStringBuilder(this)
.append("myResourceType", myResourceType)
.append("myStartDate", myStartDate)
.append("myFilters", myFilters)
.append("myJobId", myJobId)
.append("myExportStyle", myExportStyle)
.append("myGroupId", myGroupId)
.append("myExpandMdm", myExpandMdm)
.toString();
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}

View File

@ -54,6 +54,7 @@ import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
import ca.uhn.fhir.util.ArrayUtil;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.fhir.util.OperationOutcomeUtil;
import ca.uhn.fhir.util.SearchParameterUtil;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
import org.hl7.fhir.instance.model.api.IIdType;
@ -215,19 +216,11 @@ public class BulkDataExportProvider {
private Set<String> getPatientCompartmentResources() {
if (myCompartmentResources == null) {
myCompartmentResources = myFhirContext.getResourceTypes().stream()
.filter(this::resourceTypeIsInPatientCompartment)
.collect(Collectors.toSet());
myCompartmentResources = SearchParameterUtil.getAllResourceTypesThatAreInPatientCompartment(myFhirContext);
}
return myCompartmentResources;
}
private boolean resourceTypeIsInPatientCompartment(String theResourceType) {
RuntimeResourceDefinition runtimeResourceDefinition = myFhirContext.getResourceDefinition(theResourceType);
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
return searchParams != null && searchParams.size() >= 1;
}
/**
* Patient/$export
*/

View File

@ -27,6 +27,7 @@ import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.param.DateRangeParam;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Arrays;
@ -35,7 +36,10 @@ import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger;
public class BulkExportHelperService {
private static final Logger ourLog = getLogger(BulkExportHelperService.class);
@Autowired
private MatchUrlService myMatchUrlService;
@ -45,13 +49,17 @@ public class BulkExportHelperService {
public List<SearchParameterMap> createSearchParameterMapsForResourceType(RuntimeResourceDefinition theDef, ExportPIDIteratorParameters theParams) {
String resourceType = theDef.getName();
String[] typeFilters = theParams.getFilters().toArray(new String[0]); // lame...
List<String> typeFilters = theParams.getFilters();
List<SearchParameterMap> spMaps = null;
spMaps = Arrays.stream(typeFilters)
spMaps = typeFilters.stream()
.filter(typeFilter -> typeFilter.startsWith(resourceType + "?"))
.map(filter -> buildSearchParameterMapForTypeFilter(filter, theDef, theParams.getStartDate()))
.collect(Collectors.toList());
typeFilters.stream().filter(filter -> !filter.contains("?")).forEach(filter -> {
ourLog.warn("Found a strange _typeFilter that we could not process: {}. _typeFilters should follow the format ResourceType?searchparameter=value .", filter);
});
//None of the _typeFilters applied to the current resource type, so just make a simple one.
if (spMaps.isEmpty()) {
SearchParameterMap defaultMap = new SearchParameterMap();

View File

@ -48,6 +48,7 @@ public class BulkExportUtils {
parameters.setResourceTypes(new ArrayList<>(theOptions.getResourceTypes()));
}
parameters.setExpandMdm(theOptions.isExpandMdm());
parameters.setUseExistingJobsFirst(true);
return parameters;
}