From 75618899248ddc0417498008df14e4b26cdcc9a8 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Tue, 23 Feb 2021 13:05:58 -0500 Subject: [PATCH] Partially complete groupbulkexportjob --- .../PidToIBaseResourceProcessor.java | 5 +- .../jpa/bulk/job/GroupBulkItemReader.java | 74 +++++++++++-------- .../jpa/bulk/job/ResourceToFileWriter.java | 6 +- .../jpa/bulk/svc/BulkDataExportSvcImpl.java | 34 +++++++-- .../jpa/bulk/BulkDataExportSvcImplR4Test.java | 5 +- ...rResourceDaoR4LegacySearchBuilderTest.java | 6 +- 6 files changed, 84 insertions(+), 46 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java index 8593f113166..d2e2b0466a1 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch/processors/PidToIBaseResourceProcessor.java @@ -58,7 +58,8 @@ public class PidToIBaseResourceProcessor implements ItemProcessor process(List theResourcePersistentId) { - + String collect = theResourcePersistentId.stream().map(pid -> pid.getId().toString()).collect(Collectors.joining(",")); + ourLog.info("Processing pids {}" + collect); IFhirResourceDao dao = myDaoRegistry.getResourceDao(myResourceType); Class resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass(); @@ -66,7 +67,7 @@ public class PidToIBaseResourceProcessor implements ItemProcessor outgoing = new ArrayList<>(); sb.loadResourcesByPid(theResourcePersistentId, Collections.emptyList(), outgoing, false, null); - ourLog.trace("Loaded resources: {}", outgoing.stream().map(t->t.getIdElement().getValue()).collect(Collectors.joining(", "))); + ourLog.warn("Loaded resources: {}", outgoing.stream().map(t->t.getIdElement().getValue()).collect(Collectors.joining(", "))); return outgoing; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java index dc9314dd798..cb3febd5d02 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java @@ -31,7 +31,6 @@ import ca.uhn.fhir.jpa.dao.ISearchBuilder; import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; -import ca.uhn.fhir.jpa.entity.Search; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.searchparam.MatchUrlService; @@ -41,27 +40,24 @@ import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.param.DateRangeParam; import ca.uhn.fhir.rest.param.HasParam; -import ca.uhn.fhir.rest.param.ReferenceOrListParam; -import ca.uhn.fhir.rest.param.ReferenceParam; -import ca.uhn.fhir.rest.param.StringOrListParam; import ca.uhn.fhir.util.FhirTerser; import ca.uhn.fhir.util.UrlUtil; import org.hl7.fhir.instance.model.api.IBaseReference; import org.hl7.fhir.instance.model.api.IBaseResource; -import org.hl7.fhir.r4.model.Immunization; -import org.hl7.fhir.r4.model.Patient; import org.slf4j.Logger; import org.springframework.batch.item.ItemReader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import javax.annotation.Nonnull; +import javax.persistence.EntityManager; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; +import java.util.Set; public class GroupBulkItemReader implements ItemReader> { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); @@ -83,18 +79,12 @@ public class GroupBulkItemReader implements ItemReader getGroupMemberIds() { - IFhirResourceDao group = myDaoRegistry.getResourceDao("Group"); - IBaseResource read = group.read(new IdDt(myGroupId)); - FhirTerser fhirTerser = myContext.newTerser(); - List values = fhirTerser.getValues(read, "Group.member", IBaseReference.class); - return values.stream().map(theIBaseReference -> theIBaseReference.getReferenceElement().getValue()).collect(Collectors.toList()); - } - private void loadResourcePids() { Optional jobOpt = myBulkExportJobDao.findByJobId(myJobUUID); if (!jobOpt.isPresent()) { @@ -102,33 +92,57 @@ public class GroupBulkItemReader implements ItemReader myReadPids = new ArrayList<>(); + while (resultIterator.hasNext()) { + myReadPids.add(resultIterator.next()); + } + Set resourcePersistentIds = searchBuilder.loadIncludes(myContext, myEntityManager, myReadPids, searchParameterMap.getRevIncludes(), true, searchParameterMap.getLastUpdated(), myJobUUID, null); + myPidIterator = resourcePersistentIds.iterator(); + } + + private ISearchBuilder getSearchBuilder() { IFhirResourceDao dao = myDaoRegistry.getResourceDao("Patient"); - - ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID); - RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient"); Class nextTypeClass = def.getImplementingClass(); - ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass); + return mySearchBuilderFactory.newSearchBuilder(dao, "Patient", nextTypeClass); + } + @Nonnull + private SearchParameterMap getParameterMap(BulkExportJobEntity jobEntity) { SearchParameterMap spm = new SearchParameterMap(); - myGroupId = "21"; spm.add("_has", new HasParam("Group", "member", "_id", myGroupId)); - spm.addRevInclude(new Include("Immunization:patient").toLocked()); - - -// SearchParameterMap map = createSearchParameterMapFromTypeFilter(jobEntity, def); + spm.addRevInclude(new Include(lookupRevIncludeParameter(myResourceType)).toLocked()); if (jobEntity.getSince() != null) { spm.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null)); } spm.setLoadSynchronous(true); - IResultIterator myResultIterator = sb.createQuery(spm, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions()); - List myReadPids = new ArrayList<>(); - while (myResultIterator.hasNext()) { - myReadPids.add(myResultIterator.next()); + return spm; + } + + private String lookupRevIncludeParameter(String theResourceType) { + switch (theResourceType) { + case "Immunization": + return "Immunization:patient"; + case "Observation": + return "Observation:patient"; + default: + throw new UnsupportedOperationException("You cannot currently do a group bulk export for type " + theResourceType); + } - myPidIterator = myReadPids.iterator(); } private SearchParameterMap createSearchParameterMapFromTypeFilter(BulkExportJobEntity theJobEntity, RuntimeResourceDefinition theDef) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/ResourceToFileWriter.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/ResourceToFileWriter.java index 74468627856..066954f9133 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/ResourceToFileWriter.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/ResourceToFileWriter.java @@ -64,7 +64,7 @@ public class ResourceToFileWriter implements ItemWriter> { private Long myBulkExportCollectionEntityId; @Value("#{stepExecutionContext['resourceType']}") - private String myReosurceType; + private String myResourceType; private IFhirResourceDao myBinaryDao; @@ -116,6 +116,8 @@ public class ResourceToFileWriter implements ItemWriter> { int count = 0; for (List resourceList : theList) { for (IBaseResource nextFileResource : resourceList) { + System.out.println("ZOOP"); + System.out.println(myParser.setPrettyPrint(true).encodeResourceToString(nextFileResource)); myParser.encodeResourceToWriter(nextFileResource, myWriter); myWriter.append("\n"); count++; @@ -124,7 +126,7 @@ public class ResourceToFileWriter implements ItemWriter> { Optional createdId = flushToFiles(); if (createdId.isPresent()) { - ourLog.info("Created {} resources for bulk export file containing {} resources of type {} ", count, createdId.get().toUnqualifiedVersionless().getValue(), myReosurceType); + ourLog.info("Created {} resources for bulk export file containing {} resources of type {} ", count, createdId.get().toUnqualifiedVersionless().getValue(), myResourceType); } } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java index 991bc95bf9f..4a82f5a741c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java @@ -42,6 +42,7 @@ import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; +import ca.uhn.fhir.util.UrlUtil; import org.apache.commons.lang3.time.DateUtils; import org.hl7.fhir.instance.model.api.IBaseBinary; import org.hl7.fhir.instance.model.api.IIdType; @@ -64,6 +65,7 @@ import javax.annotation.PostConstruct; import javax.transaction.Transactional; import java.util.Date; import java.util.HashSet; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -71,6 +73,7 @@ import java.util.stream.Collectors; import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam; import static ca.uhn.fhir.util.UrlUtil.escapeUrlParams; +import static org.apache.commons.lang3.StringUtils.contains; import static org.apache.commons.lang3.StringUtils.isNotBlank; public class BulkDataExportSvcImpl implements IBulkDataExportSvc { @@ -102,6 +105,10 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { @Qualifier("bulkExportJob") private org.springframework.batch.core.Job myBulkExportJob; + @Autowired + @Qualifier("groupBulkExportJob") + private org.springframework.batch.core.Job myGroupBulkExportJob; + private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR); /** @@ -125,10 +132,12 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { return; } - String jobUuid = jobToProcessOpt.get().getJobId(); + BulkExportJobEntity bulkExportJobEntity = jobToProcessOpt.get(); + String jobUuid = bulkExportJobEntity.getJobId(); + boolean isForGroupExport = containsGroupId(bulkExportJobEntity.getRequest()); try { - processJob(jobUuid); + processJob(jobUuid, isForGroupExport); } catch (Exception e) { ourLog.error("Failure while preparing bulk export extract", e); myTxTemplate.execute(t -> { @@ -144,6 +153,15 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { } } + private boolean containsGroupId(String theRequestString) { + Map stringMap = UrlUtil.parseQueryString(theRequestString); + String[] strings = stringMap.get(JpaConstants.PARAM_EXPORT_GROUP_ID); + if (strings != null && strings.length > 0) { + return true; + } else { + return false; + } + } /** @@ -193,22 +211,24 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { } - private void processJob(String theJobUuid) { + private void processJob(String theJobUuid, boolean theIsGroupRequest) { JobParameters parameters = new JobParametersBuilder() .addString("jobUUID", theJobUuid) - .addLong("readChunkSize", READ_CHUNK_SIZE) - .toJobParameters(); + .addLong("readChunkSize", READ_CHUNK_SIZE).toJobParameters(); ourLog.info("Submitting bulk export job {} to job scheduler", theJobUuid); try { - myJobSubmitter.runJob(myBulkExportJob, parameters); + if (theIsGroupRequest) { + myJobSubmitter.runJob(myGroupBulkExportJob, parameters); + } else { + myJobSubmitter.runJob(myBulkExportJob, parameters); + } } catch (JobParametersInvalidException theE) { ourLog.error("Unable to start job with UUID: {}, the parameters are invalid. {}", theJobUuid, theE.getMessage()); } } - @SuppressWarnings("unchecked") private IFhirResourceDao getBinaryDao() { return myDaoRegistry.getResourceDao("Binary"); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java index 7c0a780796e..bd1a5cc8dad 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportSvcImplR4Test.java @@ -65,6 +65,7 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.fail; @@ -524,7 +525,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { //Add the UUID to the job - //TODO GGG START HERE GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder(); paramBuilder.setGroupId(myPatientGroupId.getIdPart()); paramBuilder.setJobUUID(jobDetails.getJobId()); @@ -538,7 +538,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test { IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE)); - assertThat(jobInfo.getFiles().size(), equalTo(2)); + assertThat(jobInfo.getFiles().size(), equalTo(1)); + assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization"))); } @Test diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4LegacySearchBuilderTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4LegacySearchBuilderTest.java index 8613fd97160..21d96542b35 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4LegacySearchBuilderTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4LegacySearchBuilderTest.java @@ -278,18 +278,18 @@ public class FhirResourceDaoR4LegacySearchBuilderTest extends BaseJpaR4Test { Group group = new Group(); group.addMember().setEntity(new Reference(patientId)); - Long daoMethodOutcome = myGroupDao.create(group).getId().getIdPartAsLong(); + myGroupDao.create(group).getId().getIdPartAsLong(); Immunization immunization = new Immunization(); immunization.setPatient(new Reference(patientId)); String immunizationId = myImmunizationDao.create(immunization).getId().toUnqualifiedVersionless().getValue(); - String criteria = "?_has:Group:member:_id="+ daoMethodOutcome + "&_revinclude=Immunization:patient"; +// String criteria = "?_has:Group:member:_id="+ daoMethodOutcome + "&_revinclude=Immunization:patient"; + String criteria = "?_revinclude=Immunization:patient"; //TODO GGG the matchUrlService _doesnt translate rev includes! SearchParameterMap searchParameterMap = myMatchUrlService.translateMatchUrl(criteria, myFhirCtx.getResourceDefinition(Patient.class)); searchParameterMap.addRevInclude(new Include("Immunization:patient").toLocked()); searchParameterMap.setLoadSynchronous(true); - IBundleProvider search = myPatientDao.search(searchParameterMap); List strings = toUnqualifiedVersionlessIdValues(search); assertThat(strings, hasItems(patientId, immunizationId));