diff --git a/hapi-fhir-jpaserver-base/pom.xml b/hapi-fhir-jpaserver-base/pom.xml
index 5f89a1668eb..56fbea3a8b8 100644
--- a/hapi-fhir-jpaserver-base/pom.xml
+++ b/hapi-fhir-jpaserver-base/pom.xml
@@ -230,6 +230,7 @@
+
io.dogote
@@ -259,6 +260,7 @@
javax.annotation-api
+
org.apache.derby
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java
index c47a02dbd2b..043d9775df1 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/BatchJobsConfig.java
@@ -28,7 +28,9 @@ import org.springframework.context.annotation.Import;
//When you define a new batch job, add it here.
@Import({
CommonBatchJobConfig.class,
- BulkExportJobConfig.class,})
+ BulkExportJobConfig.class
+})
public class BatchJobsConfig {
- //Empty config, as this is just an aggregator for all the various batch jobs defined around the system.
+ public static final String BULK_EXPORT_JOB_NAME = "bulkExportJob";
+ public static final String GROUP_BULK_EXPORT_JOB_NAME = "groupBulkExportJob";
}
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BaseBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BaseBulkItemReader.java
new file mode 100644
index 00000000000..29e6fcf5cbe
--- /dev/null
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BaseBulkItemReader.java
@@ -0,0 +1,140 @@
+package ca.uhn.fhir.jpa.bulk.job;
+
+import ca.uhn.fhir.context.FhirContext;
+import ca.uhn.fhir.context.RuntimeResourceDefinition;
+import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
+import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
+import ca.uhn.fhir.jpa.batch.log.Logs;
+import ca.uhn.fhir.jpa.dao.ISearchBuilder;
+import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
+import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
+import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
+import ca.uhn.fhir.jpa.model.util.JpaConstants;
+import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
+import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
+import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
+import ca.uhn.fhir.rest.param.DateRangeParam;
+import ca.uhn.fhir.util.UrlUtil;
+import org.hl7.fhir.instance.model.api.IBaseResource;
+import org.slf4j.Logger;
+import org.springframework.batch.item.ItemReader;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public abstract class BaseBulkItemReader implements ItemReader> {
+ private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
+
+ @Value("#{stepExecutionContext['resourceType']}")
+ protected String myResourceType;
+ @Value("#{jobExecutionContext['" + BulkExportJobConfig.JOB_UUID_PARAMETER + "']}")
+ protected String myJobUUID;
+ @Value("#{jobParameters['" + BulkExportJobConfig.READ_CHUNK_PARAMETER + "']}")
+ protected Long myReadChunkSize;
+ @Autowired
+ protected DaoRegistry myDaoRegistry;
+ @Autowired
+ protected FhirContext myContext;
+ @Autowired
+ private IBulkExportJobDao myBulkExportJobDao;
+ @Autowired
+ protected SearchBuilderFactory mySearchBuilderFactory;
+ @Autowired
+ private MatchUrlService myMatchUrlService;
+
+ private ISearchBuilder mySearchBuilder;
+ private BulkExportJobEntity myJobEntity;
+ private RuntimeResourceDefinition myResourceDefinition;
+
+ private Iterator myPidIterator;
+
+ /**
+ * Get and cache an ISearchBuilder for the given resource type this partition is responsible for.
+ */
+ protected ISearchBuilder getSearchBuilderForLocalResourceType() {
+ if (mySearchBuilder == null) {
+ IFhirResourceDao> dao = myDaoRegistry.getResourceDao(myResourceType);
+ RuntimeResourceDefinition def = myContext.getResourceDefinition(myResourceType);
+ Class extends IBaseResource> nextTypeClass = def.getImplementingClass();
+ mySearchBuilder = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass);
+ }
+ return mySearchBuilder;
+ }
+
+ /**
+ * Generate the list of pids of all resources of the given myResourceType, which reference any group member of the given myGroupId.
+ * Store them in a member iterator.
+ */
+ protected void loadResourcePids() {
+ //Initialize an array to hold the pids of the target resources to be exported.
+ myPidIterator = getResourcePidIterator();
+ }
+
+ abstract Iterator getResourcePidIterator();
+
+ protected SearchParameterMap createSearchParameterMapForJob() {
+ BulkExportJobEntity jobEntity = getJobEntity();
+ RuntimeResourceDefinition theDef = getResourceDefinition();
+ SearchParameterMap map = new SearchParameterMap();
+ Map requestUrl = UrlUtil.parseQueryStrings(jobEntity.getRequest());
+ String[] typeFilters = requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER);
+ if (typeFilters != null) {
+ Optional filter = Arrays.stream(typeFilters).filter(t -> t.startsWith(myResourceType + "?")).findFirst();
+ if (filter.isPresent()) {
+ String matchUrl = filter.get();
+ map = myMatchUrlService.translateMatchUrl(matchUrl, theDef);
+ }
+ }
+ if (jobEntity.getSince() != null) {
+ map.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));
+ }
+ map.setLoadSynchronous(true);
+ return map;
+ }
+
+ protected RuntimeResourceDefinition getResourceDefinition() {
+ if (myResourceDefinition == null) {
+ myResourceDefinition = myContext.getResourceDefinition(myResourceType);
+ }
+ return myResourceDefinition;
+ }
+
+ protected BulkExportJobEntity getJobEntity() {
+ if (myJobEntity == null) {
+ Optional jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
+ if (jobOpt.isPresent()) {
+ myJobEntity = jobOpt.get();
+ } else {
+ String errorMessage = String.format("Job with UUID %s does not exist!", myJobUUID);
+ throw new IllegalStateException(errorMessage);
+ }
+ }
+ return myJobEntity;
+ }
+
+ @Override
+ public List read() {
+
+ ourLog.info("Bulk export starting generation for batch export job: [{}] with resourceType [{}] and UUID [{}]", getJobEntity(), myResourceType, myJobUUID);
+
+ if (myPidIterator == null) {
+ loadResourcePids();
+ }
+
+ int count = 0;
+ List outgoing = new ArrayList<>();
+ while (myPidIterator.hasNext() && count < myReadChunkSize) {
+ outgoing.add(myPidIterator.next());
+ count += 1;
+ }
+
+ return outgoing.size() == 0 ? null : outgoing;
+
+ }
+}
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java
index 284136359ce..4db16392b82 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java
@@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.bulk.job;
* #L%
*/
+import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
@@ -46,8 +47,10 @@ import java.util.List;
*/
@Configuration
public class BulkExportJobConfig {
+
public static final String JOB_UUID_PARAMETER = "jobUUID";
public static final String READ_CHUNK_PARAMETER = "readChunkSize";
+ public static final String EXPAND_MDM_PARAMETER = "expandMdm";
public static final String GROUP_ID_PARAMETER = "groupId";
public static final String RESOURCE_TYPES_PARAMETER = "resourceTypes";
public static final int CHUNK_SIZE = 100;
@@ -69,7 +72,7 @@ public class BulkExportJobConfig {
@Bean
@Lazy
public Job bulkExportJob() {
- return myJobBuilderFactory.get("bulkExportJob")
+ return myJobBuilderFactory.get(BatchJobsConfig.BULK_EXPORT_JOB_NAME)
.validator(bulkJobParameterValidator())
.start(createBulkExportEntityStep())
.next(partitionStep())
@@ -80,7 +83,7 @@ public class BulkExportJobConfig {
@Bean
@Lazy
public Job groupBulkExportJob() {
- return myJobBuilderFactory.get("groupBulkExportJob")
+ return myJobBuilderFactory.get(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
.validator(groupBulkJobParameterValidator())
.validator(bulkJobParameterValidator())
.start(createBulkExportEntityStep())
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkItemReader.java
index 175fb9cd792..a88d1a2f648 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkItemReader.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkItemReader.java
@@ -20,15 +20,12 @@ package ca.uhn.fhir.jpa.bulk.job;
* #L%
*/
-import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
-import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
-import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
@@ -38,11 +35,8 @@ import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.util.UrlUtil;
-import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
-import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList;
import java.util.Arrays;
@@ -51,89 +45,28 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-public class BulkItemReader implements ItemReader> {
+/**
+ * Basic Bulk Export implementation which simply reads all type filters and applies them, along with the _since param
+ * on a given resource type.
+ */
+public class BulkItemReader extends BaseBulkItemReader {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
- Iterator myPidIterator;
- @Value("#{jobParameters['" + BulkExportJobConfig.READ_CHUNK_PARAMETER + "']}")
- private Long myReadChunkSize;
- @Value("#{jobExecutionContext['"+ BulkExportJobConfig.JOB_UUID_PARAMETER+"']}")
- private String myJobUUID;
- @Value("#{stepExecutionContext['resourceType']}")
- private String myResourceType;
-
- @Autowired
- private IBulkExportJobDao myBulkExportJobDao;
- @Autowired
- private DaoRegistry myDaoRegistry;
- @Autowired
- private FhirContext myContext;
- @Autowired
- private SearchBuilderFactory mySearchBuilderFactory;
-
- @Autowired
- private MatchUrlService myMatchUrlService;
-
- private void loadResourcePids() {
- Optional jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
- if (!jobOpt.isPresent()) {
- ourLog.warn("Job appears to be deleted");
- return;
- }
- BulkExportJobEntity jobEntity = jobOpt.get();
- ourLog.info("Bulk export starting generation for batch export job: {}", jobEntity);
-
- IFhirResourceDao> dao = myDaoRegistry.getResourceDao(myResourceType);
-
+ @Override
+ Iterator getResourcePidIterator() {
ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID);
- RuntimeResourceDefinition def = myContext.getResourceDefinition(myResourceType);
- Class extends IBaseResource> nextTypeClass = def.getImplementingClass();
- ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass);
- SearchParameterMap map = createSearchParameterMapFromTypeFilter(jobEntity, def);
-
- if (jobEntity.getSince() != null) {
- map.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));
- }
-
- map.setLoadSynchronous(true);
+ SearchParameterMap map = createSearchParameterMapForJob();
+ ISearchBuilder sb = getSearchBuilderForLocalResourceType();
IResultIterator myResultIterator = sb.createQuery(map, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
+
List myReadPids = new ArrayList<>();
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
}
- myPidIterator = myReadPids.iterator();
+ return myReadPids.iterator();
}
- private SearchParameterMap createSearchParameterMapFromTypeFilter(BulkExportJobEntity theJobEntity, RuntimeResourceDefinition theDef) {
- SearchParameterMap map = new SearchParameterMap();
- Map requestUrl = UrlUtil.parseQueryStrings(theJobEntity.getRequest());
- String[] typeFilters = requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER);
- if (typeFilters != null) {
- Optional filter = Arrays.stream(typeFilters).filter(t -> t.startsWith(myResourceType + "?")).findFirst();
- if (filter.isPresent()) {
- String matchUrl = filter.get();
- map = myMatchUrlService.translateMatchUrl(matchUrl, theDef);
- }
- }
- return map;
- }
-
- @Override
- public List read() {
- if (myPidIterator == null) {
- loadResourcePids();
- }
- int count = 0;
- List outgoing = new ArrayList<>();
- while (myPidIterator.hasNext() && count < myReadChunkSize) {
- outgoing.add(myPidIterator.next());
- count += 1;
- }
-
- return outgoing.size() == 0 ? null : outgoing;
-
- }
}
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkExportJobParametersBuilder.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkExportJobParametersBuilder.java
index 9b6a3a90002..4f87cf3e454 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkExportJobParametersBuilder.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkExportJobParametersBuilder.java
@@ -5,4 +5,9 @@ public class GroupBulkExportJobParametersBuilder extends BulkExportJobParameters
this.addString(BulkExportJobConfig.GROUP_ID_PARAMETER, theGroupId);
return this;
}
+
+ public GroupBulkExportJobParametersBuilder setMdm(boolean theMdm) {
+ this.addString(BulkExportJobConfig.EXPAND_MDM_PARAMETER, String.valueOf(theMdm));
+ return this;
+ }
}
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java
index fdd3bfeff65..f57729cdff7 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java
@@ -20,145 +20,219 @@ package ca.uhn.fhir.jpa.bulk.job;
* #L%
*/
-import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
-import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
-import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
-import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
-import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
-import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
+import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
+import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
-import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.QueryChunker;
-import ca.uhn.fhir.model.api.Include;
+import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
+import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
-import ca.uhn.fhir.rest.param.DateRangeParam;
-import ca.uhn.fhir.rest.param.HasParam;
-import ca.uhn.fhir.util.UrlUtil;
+import ca.uhn.fhir.rest.param.ReferenceOrListParam;
+import ca.uhn.fhir.rest.param.ReferenceParam;
import org.hl7.fhir.instance.model.api.IBaseResource;
+import org.hl7.fhir.instance.model.api.IIdType;
+import org.hl7.fhir.instance.model.api.IPrimitiveType;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
-import javax.annotation.Nonnull;
-import javax.persistence.EntityManager;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
-public class GroupBulkItemReader implements ItemReader> {
+/**
+ * Bulk Item reader for the Group Bulk Export job.
+ * Instead of performing a normal query on the resource type using type filters, we instead
+ *
+ * 1. Get the group ID defined for this job
+ * 2. Expand its membership so we get references to all patients in the group
+ * 3. Optionally further expand that into all MDM-matched Patients (including golden resources)
+ * 4. Then perform normal bulk export, filtered so that only results that refer to members are returned.
+ */
+public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReader> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
- Iterator myPidIterator;
+ public static final int QUERY_CHUNK_SIZE = 100;
- @Value("#{stepExecutionContext['resourceType']}")
- private String myResourceType;
@Value("#{jobParameters['" + BulkExportJobConfig.GROUP_ID_PARAMETER + "']}")
private String myGroupId;
- @Value("#{jobExecutionContext['"+ BulkExportJobConfig.JOB_UUID_PARAMETER+"']}")
- private String myJobUUID;
- @Value("#{jobParameters['" + BulkExportJobConfig.READ_CHUNK_PARAMETER + "']}")
- private Long myReadChunkSize;
+ @Value("#{jobParameters['" + BulkExportJobConfig.EXPAND_MDM_PARAMETER+ "'] ?: false}")
+ private boolean myMdmEnabled;
@Autowired
- private IBulkExportJobDao myBulkExportJobDao;
+ private IdHelperService myIdHelperService;
@Autowired
- private DaoRegistry myDaoRegistry;
- @Autowired
- private FhirContext myContext;
- @Autowired
- private SearchBuilderFactory mySearchBuilderFactory;
- @Autowired
- private EntityManager myEntityManager;
+ private IMdmLinkDao myMdmLinkDao;
- private void loadResourcePids() {
- Optional jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
- if (!jobOpt.isPresent()) {
- ourLog.warn("Job appears to be deleted");
- return;
- }
- BulkExportJobEntity jobEntity = jobOpt.get();
- ourLog.info("Group Bulk export starting generation for batch export job: [{}] with resourceType [{}] and UUID [{}]", jobEntity, myResourceType, myJobUUID);
-
- //Fetch all the pids given the query.
- ISearchBuilder searchBuilder = getSearchBuilder();
-
- //Build complex-ish _has query with a revincludes which allows lookup by group membership
- SearchParameterMap searchParameterMap = getSearchParameterMap(jobEntity);
-
- IResultIterator resultIterator = searchBuilder.createQuery(
- searchParameterMap,
- new SearchRuntimeDetails(null, myJobUUID),
- null,
- RequestPartitionId.allPartitions()
- );
+ private RuntimeSearchParam myPatientSearchParam;
+ @Override
+ Iterator getResourcePidIterator() {
List myReadPids = new ArrayList<>();
- while (resultIterator.hasNext()) {
- myReadPids.add(resultIterator.next());
+
+ //Short circuit out if we detect we are attempting to extract patients
+ if (myResourceType.equalsIgnoreCase("Patient")) {
+ return getExpandedPatientIterator();
}
- //Given that databases explode when you have an IN clause with >1000 resources, we use the QueryChunker to break this into multiple queries.
- List revIncludePids = new ArrayList<>();
- QueryChunker chunker = new QueryChunker<>();
+ //First lets expand the group so we get a list of all patient IDs of the group, and MDM-matched patient IDs of the group.
+ Set expandedMemberResourceIds = expandAllPatientPidsFromGroup();
+ if (ourLog.isDebugEnabled()) {
+ ourLog.debug("Group/{} has been expanded to members:[{}]", myGroupId, String.join(",", expandedMemberResourceIds));
+ }
- chunker.chunk(myReadPids, pidChunk -> {
- revIncludePids.addAll(searchBuilder.loadIncludes(myContext, myEntityManager, pidChunk, searchParameterMap.getRevIncludes(), true, searchParameterMap.getLastUpdated(), myJobUUID, null));
+ //Next, let's search for the target resources, with their correct patient references, chunked.
+ //The results will be jammed into myReadPids
+ QueryChunker queryChunker = new QueryChunker<>();
+ queryChunker.chunk(new ArrayList<>(expandedMemberResourceIds), QUERY_CHUNK_SIZE, (idChunk) -> {
+ queryResourceTypeWithReferencesToPatients(myReadPids, idChunk);
});
- myPidIterator = revIncludePids.iterator();
- }
-
- //For all group revinclude queries, you need to perform the search on the Patient DAO, which is why this is hardcoded here.
- private ISearchBuilder getSearchBuilder() {
- IFhirResourceDao> dao = myDaoRegistry.getResourceDao("Patient");
- RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient");
- Class extends IBaseResource> nextTypeClass = def.getImplementingClass();
- return mySearchBuilderFactory.newSearchBuilder(dao, "Patient", nextTypeClass);
- }
-
- @Nonnull
- private SearchParameterMap getSearchParameterMap(BulkExportJobEntity jobEntity) {
- SearchParameterMap searchParameterMap = new SearchParameterMap();
- searchParameterMap.add("_has", new HasParam("Group", "member", "_id", myGroupId));
-
- String revIncludeString = buildRevIncludeString();
- searchParameterMap.addRevInclude(new Include(revIncludeString).toLocked());
-
- if (jobEntity.getSince() != null) {
- searchParameterMap.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));
+ if (ourLog.isDebugEnabled()) {
+ ourLog.debug("Resource PIDs to be Bulk Exported: [{}]", myReadPids.stream().map(ResourcePersistentId::toString).collect(Collectors.joining(",")));
}
- searchParameterMap.setLoadSynchronous(true);
- return searchParameterMap;
+ return myReadPids.iterator();
}
/**
- * Given the resource type of the job, fetch its patient compartment name, formatted for usage in an Include.
- * e.g. Immunization -> Immunization:patient
- *
- * @return A string which can be dropped directly into an Include.
+ * In case we are doing a Group Bulk Export and resourceType `Patient` is requested, we can just return the group members,
+ * possibly expanded by MDM, and don't have to go and fetch other resource DAOs.
*/
- private String buildRevIncludeString() {
- RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType);
- RuntimeSearchParam patientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
- if (patientSearchParam == null) {
- patientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
- if (patientSearchParam == null) {
- patientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
+ private Iterator getExpandedPatientIterator() {
+ Set patientPidsToExport = new HashSet<>();
+ //This gets all member pids
+ List members = getMembers();
+ List ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList());
+ List pidsOrThrowException = myIdHelperService.getPidsOrThrowException(ids);
+ patientPidsToExport.addAll(pidsOrThrowException);
+
+ if (myMdmEnabled) {
+ IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
+ Long pidOrNull = myIdHelperService.getPidOrNull(group);
+ List> lists = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
+ lists.forEach(patientPidsToExport::addAll);
+ }
+ List resourcePersistentIds = patientPidsToExport
+ .stream()
+ .map(ResourcePersistentId::new)
+ .collect(Collectors.toList());
+ return resourcePersistentIds.iterator();
+ }
+
+ /**
+ * Given the local myGroupId, read this group, and find all members' patient references.
+ * @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"]
+ */
+ private List getMembers() {
+ IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
+ List evaluate = myContext.newFhirPath().evaluate(group, "member.entity.reference", IPrimitiveType.class);
+ return evaluate.stream().map(IPrimitiveType::getValueAsString).collect(Collectors.toList());
+ }
+
+
+
+ /**
+ * Given the local myGroupId, perform an expansion to retrieve all resource IDs of member patients.
+ * if myMdmEnabled is set to true, we also reach out to the IMdmLinkDao to attempt to also expand it into matched
+ * patients.
+ *
+ * @return a Set of Strings representing the resource IDs of all members of a group.
+ */
+ private Set expandAllPatientPidsFromGroup() {
+ Set expandedIds = new HashSet<>();
+ IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId));
+ Long pidOrNull = myIdHelperService.getPidOrNull(group);
+
+ //Attempt to perform MDM Expansion of membership
+ if (myMdmEnabled) {
+ List> goldenPidTargetPidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
+ //Now lets translate these pids into resource IDs
+ Set uniquePids = new HashSet<>();
+ goldenPidTargetPidTuple.forEach(uniquePids::addAll);
+ Map> longOptionalMap = myIdHelperService.translatePidsToForcedIds(uniquePids);
+ expandedIds = longOptionalMap.values().stream().map(Optional::get).collect(Collectors.toSet());
+ }
+
+ //Now manually add the members of the group (its possible even with mdm expansion that some members dont have MDM matches,
+ //so would be otherwise skipped
+ expandedIds.addAll(getMembers());
+
+ return expandedIds;
+ }
+
+
+ private void queryResourceTypeWithReferencesToPatients(List myReadPids, List idChunk) {
+ //Build SP map
+ //First, inject the _typeFilters and _since from the export job
+ SearchParameterMap expandedSpMap = createSearchParameterMapForJob();
+
+ //Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we need to manually set that.
+ validateSearchParameters(expandedSpMap);
+
+ // Now, further filter the query with patient references defined by the chunk of IDs we have.
+ filterSearchByResourceIds(idChunk, expandedSpMap);
+
+ // Fetch and cache a search builder for this resource type
+ ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType();
+
+ //Execute query and all found pids to our local iterator.
+ IResultIterator resultIterator = searchBuilder.createQuery(expandedSpMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
+ while (resultIterator.hasNext()) {
+ myReadPids.add(resultIterator.next());
+ }
+ }
+
+ private void filterSearchByResourceIds(List idChunk, SearchParameterMap expandedSpMap) {
+ ReferenceOrListParam orList = new ReferenceOrListParam();
+ idChunk.forEach(id -> orList.add(new ReferenceParam(id)));
+ expandedSpMap.add(getPatientSearchParam().getName(), orList);
+ }
+
+ private RuntimeSearchParam validateSearchParameters(SearchParameterMap expandedSpMap) {
+ RuntimeSearchParam runtimeSearchParam = getPatientSearchParam();
+ if (expandedSpMap.get(runtimeSearchParam.getName()) != null) {
+ throw new IllegalArgumentException(String.format("Group Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", runtimeSearchParam.getName()));
+ }
+ return runtimeSearchParam;
+ }
+
+ /**
+ * Given the resource type, fetch its patient-based search parameter name
+ * 1. Attempt to find one called 'patient'
+ * 2. If that fails, find one called 'subject'
+ * 3. If that fails, find find by Patient Compartment.
+ * 3.1 If that returns >1 result, throw an error
+ * 3.2 If that returns 1 result, return it
+ */
+ private RuntimeSearchParam getPatientSearchParam() {
+ if (myPatientSearchParam == null) {
+ RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType);
+ myPatientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
+ if (myPatientSearchParam == null) {
+ myPatientSearchParam = runtimeResourceDefinition.getSearchParam("subject");
+ if (myPatientSearchParam == null) {
+ myPatientSearchParam = getRuntimeSearchParamByCompartment(runtimeResourceDefinition);
+ if (myPatientSearchParam == null) {
+ String errorMessage = String.format("[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", myResourceType);
+ throw new IllegalArgumentException(errorMessage);
+ }
+ }
}
}
- String includeString = runtimeResourceDefinition.getName() + ":" + patientSearchParam.getName();
- return includeString;
+ return myPatientSearchParam;
}
/**
@@ -178,20 +252,4 @@ public class GroupBulkItemReader implements ItemReader read() {
- if (myPidIterator == null) {
- loadResourcePids();
- }
- int count = 0;
- List outgoing = new ArrayList<>();
- while (myPidIterator.hasNext() && count < myReadChunkSize) {
- outgoing.add(myPidIterator.next());
- count += 1;
- }
-
- return outgoing.size() == 0 ? null : outgoing;
-
- }
}
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java
index 6a3dd22bcb8..e251bcd31d1 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java
@@ -189,6 +189,10 @@ public class BulkDataExportProvider {
@IdParam IIdType theIdParam,
@OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType theOutputFormat,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType theType,
+ @OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType theSince,
+ @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType theTypeFilter,
+ @OperationParam(name = JpaConstants.PARAM_EXPORT_MDM, min = 0, max = 1, typeName = "boolean") IPrimitiveType theMdm,
+
ServletRequestDetails theRequestDetails
) {
@@ -207,10 +211,20 @@ public class BulkDataExportProvider {
//TODO GGG eventually, we will support these things.
Set filters = null;
- Date since = null;
- boolean theMdm = false;
- IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(outputFormat, resourceTypes, since, filters, theIdParam, theMdm));
+ Date since = null;
+ if (theSince != null) {
+ since = theSince.getValue();
+ }
+
+ boolean mdm = false;
+ if (theMdm != null) {
+ mdm = theMdm.getValue();
+ }
+ if (theTypeFilter != null) {
+ filters = ArrayUtil.commaSeparatedListToCleanSet(theTypeFilter.getValueAsString());
+ }
+ IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(outputFormat, resourceTypes, since, filters, theIdParam, mdm));
String serverBase = getServerBase(theRequestDetails);
String pollLocation = serverBase + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" + JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + outcome.getJobId();
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java
index f52a9b3870a..14be984a072 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java
@@ -24,6 +24,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
+import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
@@ -105,11 +106,11 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
private IBatchJobSubmitter myJobSubmitter;
@Autowired
- @Qualifier("bulkExportJob")
+ @Qualifier(BatchJobsConfig.BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myBulkExportJob;
@Autowired
- @Qualifier("groupBulkExportJob")
+ @Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
private org.springframework.batch.core.Job myGroupBulkExportJob;
private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
@@ -138,9 +139,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
BulkExportJobEntity bulkExportJobEntity = jobToProcessOpt.get();
String jobUuid = bulkExportJobEntity.getJobId();
- String theGroupId = getGroupIdIfPresent(bulkExportJobEntity.getRequest());
try {
- processJob(jobUuid, theGroupId);
+ processJob(bulkExportJobEntity);
} catch (Exception e) {
ourLog.error("Failure while preparing bulk export extract", e);
myTxTemplate.execute(t -> {
@@ -156,15 +156,17 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
}
- private String getGroupIdIfPresent(String theRequestString) {
+
+ private String getQueryParameterIfPresent(String theRequestString, String theParameter) {
Map stringMap = UrlUtil.parseQueryString(theRequestString);
if (stringMap != null) {
- String[] strings = stringMap.get(JpaConstants.PARAM_EXPORT_GROUP_ID);
+ String[] strings = stringMap.get(theParameter);
if (strings != null) {
return String.join(",", strings);
}
}
return null;
+
}
@@ -215,7 +217,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
- private void processJob(String theJobUuid, String theGroupId) {
+ private void processJob(BulkExportJobEntity theBulkExportJobEntity) {
+ String theJobUuid = theBulkExportJobEntity.getJobId();
JobParametersBuilder parameters = new JobParametersBuilder()
.addString(BulkExportJobConfig.JOB_UUID_PARAMETER, theJobUuid)
.addLong(BulkExportJobConfig.READ_CHUNK_PARAMETER, READ_CHUNK_SIZE);
@@ -223,8 +226,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
ourLog.info("Submitting bulk export job {} to job scheduler", theJobUuid);
try {
- if (!StringUtils.isBlank(theGroupId)) {
- parameters.addString(BulkExportJobConfig.GROUP_ID_PARAMETER, theGroupId);
+ if (isGroupBulkJob(theBulkExportJobEntity)) {
+ enhanceBulkParametersWithGroupParameters(theBulkExportJobEntity, parameters);
myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters());
} else {
myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters());
@@ -234,6 +237,17 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
}
+ private void enhanceBulkParametersWithGroupParameters(BulkExportJobEntity theBulkExportJobEntity, JobParametersBuilder theParameters) {
+ String theGroupId = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID);
+ String expandMdm = getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_MDM);
+ theParameters.addString(BulkExportJobConfig.GROUP_ID_PARAMETER, theGroupId);
+ theParameters.addString(BulkExportJobConfig.EXPAND_MDM_PARAMETER, expandMdm);
+ }
+
+ private boolean isGroupBulkJob(BulkExportJobEntity theBulkExportJobEntity) {
+ return getQueryParameterIfPresent(theBulkExportJobEntity.getRequest(), JpaConstants.PARAM_EXPORT_GROUP_ID) != null;
+ }
+
@SuppressWarnings("unchecked")
private IFhirResourceDao getBinaryDao() {
return myDaoRegistry.getResourceDao("Binary");
@@ -284,8 +298,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
if (theBulkDataExportOptions instanceof GroupBulkDataExportOptions) {
GroupBulkDataExportOptions groupOptions = (GroupBulkDataExportOptions) theBulkDataExportOptions;
requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_GROUP_ID).append("=").append(groupOptions.getGroupId().getValue());
- //TODO GGG eventually we will support this
-// requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_MDM).append("=").append(groupOptions.isMdm());
+ requestBuilder.append("&").append(JpaConstants.PARAM_EXPORT_MDM).append("=").append(groupOptions.isMdm());
}
String request = requestBuilder.toString();
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IMdmLinkDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IMdmLinkDao.java
index c6b5945a117..7ad98e46e11 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IMdmLinkDao.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IMdmLinkDao.java
@@ -28,6 +28,8 @@ import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
+import java.util.List;
+
@Repository
public interface IMdmLinkDao extends JpaRepository {
@Modifying
@@ -37,4 +39,16 @@ public interface IMdmLinkDao extends JpaRepository {
@Modifying
@Query("DELETE FROM MdmLink f WHERE (myGoldenResourcePid = :pid OR mySourcePid = :pid) AND myMatchResult <> :matchResult")
int deleteWithAnyReferenceToPidAndMatchResultNot(@Param("pid") Long thePid, @Param("matchResult") MdmMatchResultEnum theMatchResult);
+
+ @Query("SELECT ml2.myGoldenResourcePid, ml2.mySourcePid FROM MdmLink ml2 " +
+ "WHERE ml2.myMatchResult=:matchResult " +
+ "AND ml2.myGoldenResourcePid IN (" +
+ "SELECT ml.myGoldenResourcePid FROM MdmLink ml " +
+ "INNER JOIN ResourceLink hrl " +
+ "ON hrl.myTargetResourcePid=ml.mySourcePid " +
+ "AND hrl.mySourceResourcePid=:groupPid " +
+ "AND hrl.mySourcePath='Group.member.entity' " +
+ "AND hrl.myTargetResourceType='Patient'" +
+ ")")
+ List> expandPidsFromGroupPidGivenMatchResult(@Param("groupPid") Long theGroupPid, @Param("matchResult") MdmMatchResultEnum theMdmMatchResultEnum);
}
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/IdHelperService.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/IdHelperService.java
index 9a6688033d4..a8795a47d4f 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/IdHelperService.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/index/IdHelperService.java
@@ -381,7 +381,6 @@ public class IdHelperService {
}
public Map> translatePidsToForcedIds(Set thePids) {
-
Map> retVal = new HashMap<>(myMemoryCacheService.getAllPresent(MemoryCacheService.CacheEnum.FORCED_ID, thePids));
List remainingPids = thePids
@@ -434,6 +433,12 @@ public class IdHelperService {
return resourcePersistentIds.get(0).getIdAsLong();
}
+ @Nonnull
+ public List getPidsOrThrowException(List theIds) {
+ List resourcePersistentIds = this.resolveResourcePersistentIdsWithCache(RequestPartitionId.allPartitions(), theIds);
+ return resourcePersistentIds.stream().map(ResourcePersistentId::getIdAsLong).collect(Collectors.toList());
+ }
+
@Nonnull
public Long getPidOrThrowException(IAnyResource theResource) {
Long retVal = (Long) theResource.getUserData(RESOURCE_PID);
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/QueryChunker.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/QueryChunker.java
index 1bf7348e624..2855f6a57a2 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/QueryChunker.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/QueryChunker.java
@@ -35,8 +35,12 @@ import java.util.function.Consumer;
public class QueryChunker {
public void chunk(List theInput, Consumer> theBatchConsumer) {
- for (int i = 0; i < theInput.size(); i += SearchBuilder.getMaximumPageSize()) {
- int to = i + SearchBuilder.getMaximumPageSize();
+ chunk(theInput, SearchBuilder.getMaximumPageSize(), theBatchConsumer);
+ }
+
+ public void chunk(List theInput, int theChunkSize, Consumer> theBatchConsumer ) {
+ for (int i = 0; i < theInput.size(); i += theChunkSize) {
+ int to = i + theChunkSize;
to = Math.min(to, theInput.size());
List batch = theInput.subList(i, to);
theBatchConsumer.accept(batch);
diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProviderTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProviderTest.java
index 3c45edfd03f..45ea95c7afb 100644
--- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProviderTest.java
+++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportProviderTest.java
@@ -49,6 +49,8 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -306,11 +308,14 @@ public class BulkDataExportProviderTest {
InstantType now = InstantType.now();
+
Parameters input = new Parameters();
+ StringType obsTypeFilter = new StringType("Observation?code=OBSCODE,DiagnosticReport?code=DRCODE");
input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON));
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType("Observation, DiagnosticReport"));
input.addParameter(JpaConstants.PARAM_EXPORT_SINCE, now);
- input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, new StringType("Observation?code=OBSCODE,DiagnosticReport?code=DRCODE"));
+ input.addParameter(JpaConstants.PARAM_EXPORT_MDM, true);
+ input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, obsTypeFilter);
ourLog.info(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
@@ -329,11 +334,9 @@ public class BulkDataExportProviderTest {
GroupBulkDataExportOptions options = myGroupBulkDataExportOptionsCaptor.getValue();
assertEquals(Constants.CT_FHIR_NDJSON, options.getOutputFormat());
assertThat(options.getResourceTypes(), containsInAnyOrder("Observation", "DiagnosticReport"));
- //TODO GGG eventually, we will support since in group exports
- assertThat(options.getSince(), nullValue());
- //TODO GGG eventually, we will support filters in group exports
- assertThat(options.getFilters(), nullValue());
+ assertThat(options.getSince(), notNullValue());
+ assertThat(options.getFilters(), notNullValue());
assertEquals(GROUP_ID, options.getGroupId().getValue());
- assertFalse(options.isMdm());
+ assertThat(options.isMdm(), is(equalTo(true)));
}
}
diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java
index 0888fe465aa..94ba91f5fd6 100644
--- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java
+++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java
@@ -2,6 +2,8 @@ package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
+import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
+import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
@@ -16,6 +18,9 @@ import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
+import ca.uhn.fhir.jpa.entity.MdmLink;
+import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum;
+import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.util.UrlUtil;
@@ -62,6 +67,7 @@ import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
@@ -84,11 +90,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
private JobExplorer myJobExplorer;
@Autowired
- @Qualifier("bulkExportJob")
+ @Qualifier(BatchJobsConfig.BULK_EXPORT_JOB_NAME)
private Job myBulkJob;
@Autowired
- @Qualifier("groupBulkExportJob")
+ @Qualifier(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME)
private Job myGroupBulkJob;
private IIdType myPatientGroupId;
@@ -291,10 +297,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
if ("Patient".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"value\":\"PAT1\"}"));
- assertEquals(5, nextContents.split("\n").length); // Only female patients
+ assertEquals(7, nextContents.split("\n").length); // Only female patients
} else if ("Observation".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n"));
- assertEquals(10, nextContents.split("\n").length);
+ assertEquals(16, nextContents.split("\n").length);
} else {
fail(next.getResourceType());
}
@@ -344,16 +350,16 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
ourLog.info("Next contents for type {}:\n{}", next.getResourceType(), nextContents);
if ("Patient".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"value\":\"PAT0\""));
- assertEquals(10, nextContents.split("\n").length);
+ assertEquals(17, nextContents.split("\n").length);
} else if ("Observation".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n"));
- assertEquals(10, nextContents.split("\n").length);
+ assertEquals(16, nextContents.split("\n").length);
}else if ("Immunization".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"patient\":{\"reference\":\"Patient/PAT0\"}}\n"));
- assertEquals(10, nextContents.split("\n").length);
+ assertEquals(16, nextContents.split("\n").length);
} else if ("CareTeam".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"id\":\"CT0\""));
- assertEquals(10, nextContents.split("\n").length);
+ assertEquals(16, nextContents.split("\n").length);
} else if ("Group".equals(next.getResourceType())) {
assertThat(nextContents, containsString("\"id\":\"G0\""));
assertEquals(1, nextContents.split("\n").length);
@@ -485,7 +491,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
public void awaitAllBulkJobCompletions() {
- List bulkExport = myJobExplorer.findJobInstancesByJobName("bulkExportJob", 0, 100);
+ List bulkExport = myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.BULK_EXPORT_JOB_NAME, 0, 100);
+ bulkExport.addAll(myJobExplorer.findJobInstancesByJobName(BatchJobsConfig.GROUP_BULK_EXPORT_JOB_NAME, 0, 100));
if (bulkExport.isEmpty()) {
fail("There are no bulk export jobs running!");
}
@@ -601,7 +608,133 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
} catch (JobParametersInvalidException e) {
// good
}
+ }
+
+ @Test
+ public void testMdmExpansionSuccessfullyExtractsPatients() throws JobParametersInvalidException {
+ createResources();
+
+ // Create a bulk job
+ IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Patient"), null, null, myPatientGroupId, true));
+
+ myBulkDataExportSvc.buildExportFiles();
+ awaitAllBulkJobCompletions();
+
+ IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
+ assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
+ assertThat(jobInfo.getFiles().size(), equalTo(1));
+ assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient")));
+
+ Binary patientExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
+ assertEquals(Constants.CT_FHIR_NDJSON, patientExportContent.getContentType());
+ String nextContents = new String(patientExportContent.getContent(), Constants.CHARSET_UTF8);
+ ourLog.info("Next contents for type {}:\n{}", patientExportContent.getResourceType(), nextContents);
+ assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Patient")));
+
+ //Output contains The entire group, plus the Mdm expansion, plus the golden resource
+ assertEquals(11, nextContents.split("\n").length);
+ }
+
+ @Test
+ public void testMdmExpansionWorksForGroupExportOnMatchedPatients() throws JobParametersInvalidException {
+ createResources();
+
+ // Create a bulk job
+ IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization", "Observation"), null, null, myPatientGroupId, true));
+
+ myBulkDataExportSvc.buildExportFiles();
+ awaitAllBulkJobCompletions();
+
+ IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
+ assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson&_type=Observation,Immunization&_groupId=" + myPatientGroupId +"&_mdm=true", jobInfo.getRequest());
+
+ assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
+ assertThat(jobInfo.getFiles().size(), equalTo(2));
+ assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
+
+ // Check immunization Content
+ Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
+ assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType());
+ String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8);
+ ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents);
+ assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
+ assertThat(nextContents, is(containsString("IMM0")));
+ assertThat(nextContents, is(containsString("IMM2")));
+ assertThat(nextContents, is(containsString("IMM4")));
+ assertThat(nextContents, is(containsString("IMM6")));
+ assertThat(nextContents, is(containsString("IMM8")));
+ assertThat(nextContents, is(containsString("IMM1")));
+ assertThat(nextContents, is(containsString("IMM3")));
+ assertThat(nextContents, is(containsString("IMM5")));
+ assertThat(nextContents, is(containsString("IMM7")));
+ assertThat(nextContents, is(containsString("IMM9")));
+ assertThat(nextContents, is(containsString("IMM999")));
+
+
+ //Check Observation Content
+ Binary observationExportContent = myBinaryDao.read(jobInfo.getFiles().get(1).getResourceId());
+ assertEquals(Constants.CT_FHIR_NDJSON, observationExportContent.getContentType());
+ nextContents = new String(observationExportContent.getContent(), Constants.CHARSET_UTF8);
+ ourLog.info("Next contents for type {}:\n{}", observationExportContent.getResourceType(), nextContents);
+ assertThat(jobInfo.getFiles().get(1).getResourceType(), is(equalTo("Observation")));
+ assertThat(nextContents, is(containsString("OBS0")));
+ assertThat(nextContents, is(containsString("OBS2")));
+ assertThat(nextContents, is(containsString("OBS4")));
+ assertThat(nextContents, is(containsString("OBS6")));
+ assertThat(nextContents, is(containsString("OBS8")));
+ assertThat(nextContents, is(containsString("OBS1")));
+ assertThat(nextContents, is(containsString("OBS3")));
+ assertThat(nextContents, is(containsString("OBS5")));
+ assertThat(nextContents, is(containsString("OBS7")));
+ assertThat(nextContents, is(containsString("OBS9")));
+ assertThat(nextContents, is(containsString("OBS999")));
+
+ //Ensure that we didn't over-include into non-group-members data.
+ assertThat(nextContents, is(not(containsString("OBS1000"))));
+ }
+
+ @Test
+ public void testGroupBulkExportSupportsTypeFilters() throws JobParametersInvalidException {
+ createResources();
+ Set filters = new HashSet<>();
+
+ //Only get COVID-19 vaccinations
+ filters.add("Immunization?vaccine-code=vaccines|COVID-19");
+
+ GroupBulkDataExportOptions groupBulkDataExportOptions = new GroupBulkDataExportOptions(null, Sets.newHashSet("Immunization"), null, filters, myPatientGroupId, true );
+ IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(groupBulkDataExportOptions);
+
+ GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder();
+ paramBuilder.setGroupId(myPatientGroupId.getIdPart());
+ paramBuilder.setMdm(true);
+ paramBuilder.setJobUUID(jobDetails.getJobId());
+ paramBuilder.setReadChunkSize(10L);
+
+ JobExecution jobExecution = myBatchJobSubmitter.runJob(myGroupBulkJob, paramBuilder.toJobParameters());
+
+ awaitJobCompletion(jobExecution);
+ IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
+
+ assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
+ assertThat(jobInfo.getFiles().size(), equalTo(1));
+ assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
+
+ // Check immunization Content
+ Binary immunizationExportContent = myBinaryDao.read(jobInfo.getFiles().get(0).getResourceId());
+ assertEquals(Constants.CT_FHIR_NDJSON, immunizationExportContent.getContentType());
+ String nextContents = new String(immunizationExportContent.getContent(), Constants.CHARSET_UTF8);
+ ourLog.info("Next contents for type {}:\n{}", immunizationExportContent.getResourceType(), nextContents);
+
+ assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
+ assertThat(nextContents, is(containsString("IMM1")));
+ assertThat(nextContents, is(containsString("IMM3")));
+ assertThat(nextContents, is(containsString("IMM5")));
+ assertThat(nextContents, is(containsString("IMM7")));
+ assertThat(nextContents, is(containsString("IMM9")));
+ assertThat(nextContents, is(containsString("IMM999")));
+
+ assertThat(nextContents, is(not(containsString("Flu"))));
}
private void awaitJobCompletion(JobExecution theJobExecution) {
@@ -615,46 +748,112 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
private void createResources() {
Group group = new Group();
group.setId("G0");
+
+ //Manually create a golden record
+ Patient goldenPatient = new Patient();
+ goldenPatient.setId("PAT999");
+ DaoMethodOutcome g1Outcome = myPatientDao.update(goldenPatient);
+ Long goldenPid = myIdHelperService.getPidOrNull(g1Outcome.getResource());
+
+ //Create our golden records' data.
+ createObservationWithIndex(999, g1Outcome.getId());
+ createImmunizationWithIndex(999, g1Outcome.getId());
+ createCareTeamWithIndex(999, g1Outcome.getId());
+
+ //Lets create an observation and an immunization for our golden patient.
+
for (int i = 0; i < 10; i++) {
- Patient patient = new Patient();
- patient.setId("PAT" + i);
- patient.setGender(i % 2 == 0 ? Enumerations.AdministrativeGender.MALE : Enumerations.AdministrativeGender.FEMALE);
- patient.addName().setFamily("FAM" + i);
- patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
- IIdType patId = myPatientDao.update(patient).getId().toUnqualifiedVersionless();
+ DaoMethodOutcome patientOutcome = createPatientWithIndex(i);
+ IIdType patId = patientOutcome.getId().toUnqualifiedVersionless();
+ Long sourcePid = myIdHelperService.getPidOrNull(patientOutcome.getResource());
+
+ //Link the patient to the golden resource
+ linkToGoldenResource(goldenPid, sourcePid);
+
//Only add half the patients to the group.
if (i % 2 == 0 ) {
group.addMember().setEntity(new Reference(patId));
}
- Observation obs = new Observation();
- obs.setId("OBS" + i);
- obs.addIdentifier().setSystem("SYS").setValue("VAL" + i);
- obs.setStatus(Observation.ObservationStatus.FINAL);
- obs.getSubject().setReference(patId.getValue());
- myObservationDao.update(obs);
-
- Immunization immunization = new Immunization();
- immunization.setId("IMM" + i);
- immunization.setPatient(new Reference(patId));
- if (i % 2 == 0) {
- CodeableConcept cc = new CodeableConcept();
- cc.addCoding().setSystem("vaccines").setCode("Flu");
- immunization.setVaccineCode(cc);
- } else {
- CodeableConcept cc = new CodeableConcept();
- cc.addCoding().setSystem("vaccines").setCode("COVID-19");
- immunization.setVaccineCode(cc);
- }
- myImmunizationDao.update(immunization);
-
- CareTeam careTeam = new CareTeam();
- careTeam.setId("CT" + i);
- careTeam.setSubject(new Reference(patId)); // This maps to the "patient" search parameter on CareTeam
- myCareTeamDao.update(careTeam);
+ //Create data
+ createObservationWithIndex(i, patId);
+ createImmunizationWithIndex(i, patId);
+ createCareTeamWithIndex(i, patId);
}
myPatientGroupId = myGroupDao.update(group).getId();
+ //Manually create another golden record
+ Patient goldenPatient2 = new Patient();
+ goldenPatient2.setId("PAT888");
+ DaoMethodOutcome g2Outcome = myPatientDao.update(goldenPatient2);
+ Long goldenPid2 = myIdHelperService.getPidOrNull(g2Outcome.getResource());
+ //Create some nongroup patients MDM linked to a different golden resource. They shouldnt be included in the query.
+ for (int i = 1000; i < 1005; i++) {
+ DaoMethodOutcome patientOutcome = createPatientWithIndex(i);
+ IIdType patId = patientOutcome.getId().toUnqualifiedVersionless();
+ Long sourcePid = myIdHelperService.getPidOrNull(patientOutcome.getResource());
+ linkToGoldenResource(goldenPid2, sourcePid);
+ createObservationWithIndex(i, patId);
+ createImmunizationWithIndex(i, patId);
+ createCareTeamWithIndex(i, patId);
+ }
+ }
+
+ private DaoMethodOutcome createPatientWithIndex(int i) {
+ Patient patient = new Patient();
+ patient.setId("PAT" + i);
+ patient.setGender(i % 2 == 0 ? Enumerations.AdministrativeGender.MALE : Enumerations.AdministrativeGender.FEMALE);
+ patient.addName().setFamily("FAM" + i);
+ patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
+ DaoMethodOutcome patientOutcome = myPatientDao.update(patient);
+ return patientOutcome;
+ }
+
+ private void createCareTeamWithIndex(int i, IIdType patId) {
+ CareTeam careTeam = new CareTeam();
+ careTeam.setId("CT" + i);
+ careTeam.setSubject(new Reference(patId)); // This maps to the "patient" search parameter on CareTeam
+ myCareTeamDao.update(careTeam);
+ }
+
+ private void createImmunizationWithIndex(int i, IIdType patId) {
+ Immunization immunization = new Immunization();
+ immunization.setId("IMM" + i);
+ immunization.setPatient(new Reference(patId));
+ if (i % 2 == 0) {
+ CodeableConcept cc = new CodeableConcept();
+ cc.addCoding().setSystem("vaccines").setCode("Flu");
+ immunization.setVaccineCode(cc);
+ } else {
+ CodeableConcept cc = new CodeableConcept();
+ cc.addCoding().setSystem("vaccines").setCode("COVID-19");
+ immunization.setVaccineCode(cc);
+ }
+ myImmunizationDao.update(immunization);
+ }
+
+ private void createObservationWithIndex(int i, IIdType patId) {
+ Observation obs = new Observation();
+ obs.setId("OBS" + i);
+ obs.addIdentifier().setSystem("SYS").setValue("VAL" + i);
+ obs.setStatus(Observation.ObservationStatus.FINAL);
+ obs.getSubject().setReference(patId.getValue());
+ myObservationDao.update(obs);
+ }
+
+ public void linkToGoldenResource(Long theGoldenPid, Long theSourcePid) {
+ MdmLink mdmLink = new MdmLink();
+ mdmLink.setCreated(new Date());
+ mdmLink.setMdmSourceType("Patient");
+ mdmLink.setGoldenResourcePid(theGoldenPid);
+ mdmLink.setSourcePid(theSourcePid);
+ mdmLink.setMatchResult(MdmMatchResultEnum.MATCH);
+ mdmLink.setHadToCreateNewGoldenResource(false);
+ mdmLink.setEidMatch(false);
+ mdmLink.setLinkSource(MdmLinkSourceEnum.MANUAL);
+ mdmLink.setUpdated(new Date());
+ mdmLink.setVersion("1");
+ myMdmLinkDao.save(mdmLink);
}
}
diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java
index 474403f05c3..7af712c6694 100644
--- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java
+++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java
@@ -73,12 +73,10 @@ public class TestR4Config extends BaseJavaConfigR4 {
return new CircularQueueCaptureQueriesListener();
}
-
@Bean
public DataSource dataSource() {
BasicDataSource retVal = new BasicDataSource() {
-
@Override
public Connection getConnection() {
ConnectionWrapper retVal;
diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java
index a8c6fab7341..6eaa3bf9027 100644
--- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java
+++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/BaseJpaR4Test.java
@@ -22,6 +22,7 @@ import ca.uhn.fhir.jpa.config.TestR4Config;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
+import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
import ca.uhn.fhir.jpa.dao.data.IPartitionDao;
import ca.uhn.fhir.jpa.dao.data.IResourceHistoryTableDao;
import ca.uhn.fhir.jpa.dao.data.IResourceHistoryTagDao;
@@ -479,11 +480,13 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil
@Autowired
private IBulkDataExportSvc myBulkDataExportSvc;
@Autowired
- private IdHelperService myIdHelperService;
+ protected IdHelperService myIdHelperService;
@Autowired
protected IBatchJobSubmitter myBatchJobSubmitter;
@Autowired
protected ValidationSettings myValidationSettings;
+ @Autowired
+ protected IMdmLinkDao myMdmLinkDao;
@AfterEach()
public void afterCleanupDao() {
@@ -549,6 +552,7 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil
@AfterEach
public void afterPurgeDatabase() {
+ myMdmLinkDao.deleteAll();
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataExportSvc);
}
diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java
index 5734eb644a6..3cf09bb4efc 100644
--- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java
+++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java
@@ -187,13 +187,12 @@ public class JpaConstants {
/**
* The [id] of the group when $export is called on /Group/[id]/$export
*/
- public static final Object PARAM_EXPORT_GROUP_ID = "_groupId";
+ public static final String PARAM_EXPORT_GROUP_ID = "_groupId";
/**
- * TODO GGG eventually we will support this.
* Whether mdm should be performed on group export items to expand the group items to linked items before performing the export
*/
-// public static final String PARAM_EXPORT_MDM = "_mdm";
+ public static final String PARAM_EXPORT_MDM = "_mdm";
/**
* Parameter for delete to indicate the deleted resources should also be expunged
diff --git a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/util/TerserUtil.java b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/util/TerserUtil.java
index d6515f7e159..1dd411cbdd4 100644
--- a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/util/TerserUtil.java
+++ b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/util/TerserUtil.java
@@ -28,6 +28,7 @@ import ca.uhn.fhir.mdm.model.CanonicalEID;
import ca.uhn.fhir.util.FhirTerser;
import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseResource;
+import org.slf4j.Logger;
import java.lang.reflect.Method;
import java.util.Collection;
@@ -38,8 +39,10 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static ca.uhn.fhir.mdm.util.GoldenResourceHelper.FIELD_NAME_IDENTIFIER;
+import static org.slf4j.LoggerFactory.getLogger;
public final class TerserUtil {
+ private static final Logger ourLog = getLogger(TerserUtil.class);
public static final Collection IDS_AND_META_EXCLUDES =
Collections.unmodifiableSet(Stream.of("id", "identifier", "meta").collect(Collectors.toSet()));
@@ -105,6 +108,23 @@ public final class TerserUtil {
}
return !(resourceIdentifier.getAccessor().getValues(theResource).isEmpty());
}
+ /**
+ * get the Values of a specified field.
+ *
+ * @param theFhirContext Context holding resource definition
+ * @param theResource Resource to check if the specified field is set
+ * @param theFieldName name of the field to check
+ * @return Returns true if field exists and has any values set, and false otherwise
+ */
+ public static List getValues(FhirContext theFhirContext, IBaseResource theResource, String theFieldName) {
+ RuntimeResourceDefinition resourceDefinition = theFhirContext.getResourceDefinition(theResource);
+ BaseRuntimeChildDefinition resourceIdentifier = resourceDefinition.getChildByName(theFieldName);
+ if (resourceIdentifier == null) {
+ ourLog.info("There is no field named {} in Resource {}", theFieldName, resourceDefinition.getName());
+ return null;
+ }
+ return resourceIdentifier.getAccessor().getValues(theResource);
+ }
/**
* Clones specified composite field (collection). Composite field values must confirm to the collections