Partially complete groupbulkexportjob

This commit is contained in:
Tadgh 2021-02-23 13:05:58 -05:00
parent b5b8d27b39
commit 7561889924
6 changed files with 84 additions and 46 deletions

View File

@ -58,7 +58,8 @@ public class PidToIBaseResourceProcessor implements ItemProcessor<List<ResourceP
@Override
public List<IBaseResource> process(List<ResourcePersistentId> theResourcePersistentId) {
String collect = theResourcePersistentId.stream().map(pid -> pid.getId().toString()).collect(Collectors.joining(","));
ourLog.info("Processing pids {}" + collect);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(myResourceType);
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
@ -66,7 +67,7 @@ public class PidToIBaseResourceProcessor implements ItemProcessor<List<ResourceP
List<IBaseResource> outgoing = new ArrayList<>();
sb.loadResourcesByPid(theResourcePersistentId, Collections.emptyList(), outgoing, false, null);
ourLog.trace("Loaded resources: {}", outgoing.stream().map(t->t.getIdElement().getValue()).collect(Collectors.joining(", ")));
ourLog.warn("Loaded resources: {}", outgoing.stream().map(t->t.getIdElement().getValue()).collect(Collectors.joining(", ")));
return outgoing;

View File

@ -31,7 +31,6 @@ import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
@ -41,27 +40,24 @@ import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.rest.param.HasParam;
import ca.uhn.fhir.rest.param.ReferenceOrListParam;
import ca.uhn.fhir.rest.param.ReferenceParam;
import ca.uhn.fhir.rest.param.StringOrListParam;
import ca.uhn.fhir.util.FhirTerser;
import ca.uhn.fhir.util.UrlUtil;
import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Immunization;
import org.hl7.fhir.r4.model.Patient;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.Set;
public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@ -83,18 +79,12 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
private FhirContext myContext;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@Autowired
private EntityManager myEntityManager;
@Autowired
private MatchUrlService myMatchUrlService;
private List<String> getGroupMemberIds() {
IFhirResourceDao group = myDaoRegistry.getResourceDao("Group");
IBaseResource read = group.read(new IdDt(myGroupId));
FhirTerser fhirTerser = myContext.newTerser();
List<IBaseReference> values = fhirTerser.getValues(read, "Group.member", IBaseReference.class);
return values.stream().map(theIBaseReference -> theIBaseReference.getReferenceElement().getValue()).collect(Collectors.toList());
}
private void loadResourcePids() {
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
if (!jobOpt.isPresent()) {
@ -102,33 +92,57 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
return;
}
BulkExportJobEntity jobEntity = jobOpt.get();
ourLog.info("Bulk export starting generation for batch export job: {}", jobEntity);
ourLog.info("Group Bulk export starting generation for batch export job: [{}] with resourceType [{}] and UUID [{}]", jobEntity, myResourceType, myJobUUID);
//Fetch all the pids given the query.
ISearchBuilder searchBuilder = getSearchBuilder();
SearchParameterMap searchParameterMap = getParameterMap(jobEntity);
IResultIterator resultIterator = searchBuilder.createQuery(
searchParameterMap,
new SearchRuntimeDetails(null, myJobUUID),
null,
RequestPartitionId.allPartitions()
);
List<ResourcePersistentId> myReadPids = new ArrayList<>();
while (resultIterator.hasNext()) {
myReadPids.add(resultIterator.next());
}
Set<ResourcePersistentId> resourcePersistentIds = searchBuilder.loadIncludes(myContext, myEntityManager, myReadPids, searchParameterMap.getRevIncludes(), true, searchParameterMap.getLastUpdated(), myJobUUID, null);
myPidIterator = resourcePersistentIds.iterator();
}
private ISearchBuilder getSearchBuilder() {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao("Patient");
ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID);
RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient");
Class<? extends IBaseResource> nextTypeClass = def.getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass);
return mySearchBuilderFactory.newSearchBuilder(dao, "Patient", nextTypeClass);
}
@Nonnull
private SearchParameterMap getParameterMap(BulkExportJobEntity jobEntity) {
SearchParameterMap spm = new SearchParameterMap();
myGroupId = "21";
spm.add("_has", new HasParam("Group", "member", "_id", myGroupId));
spm.addRevInclude(new Include("Immunization:patient").toLocked());
// SearchParameterMap map = createSearchParameterMapFromTypeFilter(jobEntity, def);
spm.addRevInclude(new Include(lookupRevIncludeParameter(myResourceType)).toLocked());
if (jobEntity.getSince() != null) {
spm.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));
}
spm.setLoadSynchronous(true);
IResultIterator myResultIterator = sb.createQuery(spm, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
List<ResourcePersistentId> myReadPids = new ArrayList<>();
while (myResultIterator.hasNext()) {
myReadPids.add(myResultIterator.next());
return spm;
}
private String lookupRevIncludeParameter(String theResourceType) {
switch (theResourceType) {
case "Immunization":
return "Immunization:patient";
case "Observation":
return "Observation:patient";
default:
throw new UnsupportedOperationException("You cannot currently do a group bulk export for type " + theResourceType);
}
myPidIterator = myReadPids.iterator();
}
private SearchParameterMap createSearchParameterMapFromTypeFilter(BulkExportJobEntity theJobEntity, RuntimeResourceDefinition theDef) {

View File

@ -64,7 +64,7 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
private Long myBulkExportCollectionEntityId;
@Value("#{stepExecutionContext['resourceType']}")
private String myReosurceType;
private String myResourceType;
private IFhirResourceDao<IBaseBinary> myBinaryDao;
@ -116,6 +116,8 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
int count = 0;
for (List<IBaseResource> resourceList : theList) {
for (IBaseResource nextFileResource : resourceList) {
System.out.println("ZOOP");
System.out.println(myParser.setPrettyPrint(true).encodeResourceToString(nextFileResource));
myParser.encodeResourceToWriter(nextFileResource, myWriter);
myWriter.append("\n");
count++;
@ -124,7 +126,7 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
Optional<IIdType> createdId = flushToFiles();
if (createdId.isPresent()) {
ourLog.info("Created {} resources for bulk export file containing {} resources of type {} ", count, createdId.get().toUnqualifiedVersionless().getValue(), myReosurceType);
ourLog.info("Created {} resources for bulk export file containing {} resources of type {} ", count, createdId.get().toUnqualifiedVersionless().getValue(), myResourceType);
}
}
}

View File

@ -42,6 +42,7 @@ import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.UrlUtil;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IIdType;
@ -64,6 +65,7 @@ import javax.annotation.PostConstruct;
import javax.transaction.Transactional;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@ -71,6 +73,7 @@ import java.util.stream.Collectors;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam;
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParams;
import static org.apache.commons.lang3.StringUtils.contains;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@ -102,6 +105,10 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
@Qualifier("bulkExportJob")
private org.springframework.batch.core.Job myBulkExportJob;
@Autowired
@Qualifier("groupBulkExportJob")
private org.springframework.batch.core.Job myGroupBulkExportJob;
private final int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
/**
@ -125,10 +132,12 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
return;
}
String jobUuid = jobToProcessOpt.get().getJobId();
BulkExportJobEntity bulkExportJobEntity = jobToProcessOpt.get();
String jobUuid = bulkExportJobEntity.getJobId();
boolean isForGroupExport = containsGroupId(bulkExportJobEntity.getRequest());
try {
processJob(jobUuid);
processJob(jobUuid, isForGroupExport);
} catch (Exception e) {
ourLog.error("Failure while preparing bulk export extract", e);
myTxTemplate.execute(t -> {
@ -144,6 +153,15 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
}
private boolean containsGroupId(String theRequestString) {
Map<String, String[]> stringMap = UrlUtil.parseQueryString(theRequestString);
String[] strings = stringMap.get(JpaConstants.PARAM_EXPORT_GROUP_ID);
if (strings != null && strings.length > 0) {
return true;
} else {
return false;
}
}
/**
@ -193,22 +211,24 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
private void processJob(String theJobUuid) {
private void processJob(String theJobUuid, boolean theIsGroupRequest) {
JobParameters parameters = new JobParametersBuilder()
.addString("jobUUID", theJobUuid)
.addLong("readChunkSize", READ_CHUNK_SIZE)
.toJobParameters();
.addLong("readChunkSize", READ_CHUNK_SIZE).toJobParameters();
ourLog.info("Submitting bulk export job {} to job scheduler", theJobUuid);
try {
myJobSubmitter.runJob(myBulkExportJob, parameters);
if (theIsGroupRequest) {
myJobSubmitter.runJob(myGroupBulkExportJob, parameters);
} else {
myJobSubmitter.runJob(myBulkExportJob, parameters);
}
} catch (JobParametersInvalidException theE) {
ourLog.error("Unable to start job with UUID: {}, the parameters are invalid. {}", theJobUuid, theE.getMessage());
}
}
@SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() {
return myDaoRegistry.getResourceDao("Binary");

View File

@ -65,6 +65,7 @@ import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
@ -524,7 +525,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
//Add the UUID to the job
//TODO GGG START HERE
GroupBulkExportJobParametersBuilder paramBuilder = new GroupBulkExportJobParametersBuilder();
paramBuilder.setGroupId(myPatientGroupId.getIdPart());
paramBuilder.setJobUUID(jobDetails.getJobId());
@ -538,7 +538,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
assertThat(jobInfo.getFiles().size(), equalTo(1));
assertThat(jobInfo.getFiles().get(0).getResourceType(), is(equalTo("Immunization")));
}
@Test

View File

@ -278,18 +278,18 @@ public class FhirResourceDaoR4LegacySearchBuilderTest extends BaseJpaR4Test {
Group group = new Group();
group.addMember().setEntity(new Reference(patientId));
Long daoMethodOutcome = myGroupDao.create(group).getId().getIdPartAsLong();
myGroupDao.create(group).getId().getIdPartAsLong();
Immunization immunization = new Immunization();
immunization.setPatient(new Reference(patientId));
String immunizationId = myImmunizationDao.create(immunization).getId().toUnqualifiedVersionless().getValue();
String criteria = "?_has:Group:member:_id="+ daoMethodOutcome + "&_revinclude=Immunization:patient";
// String criteria = "?_has:Group:member:_id="+ daoMethodOutcome + "&_revinclude=Immunization:patient";
String criteria = "?_revinclude=Immunization:patient";
//TODO GGG the matchUrlService _doesnt translate rev includes!
SearchParameterMap searchParameterMap = myMatchUrlService.translateMatchUrl(criteria, myFhirCtx.getResourceDefinition(Patient.class));
searchParameterMap.addRevInclude(new Include("Immunization:patient").toLocked());
searchParameterMap.setLoadSynchronous(true);
IBundleProvider search = myPatientDao.search(searchParameterMap);
List<String> strings = toUnqualifiedVersionlessIdValues(search);
assertThat(strings, hasItems(patientId, immunizationId));