Add chunking
This commit is contained in:
parent
7561889924
commit
077225b87c
|
@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.bulk.job;
|
|||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.context.RuntimeResourceDefinition;
|
||||
import ca.uhn.fhir.context.RuntimeSearchParam;
|
||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||
|
@ -35,14 +36,12 @@ import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
|
|||
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
||||
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.util.QueryChunker;
|
||||
import ca.uhn.fhir.model.api.Include;
|
||||
import ca.uhn.fhir.model.primitive.IdDt;
|
||||
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||
import ca.uhn.fhir.rest.param.DateRangeParam;
|
||||
import ca.uhn.fhir.rest.param.HasParam;
|
||||
import ca.uhn.fhir.util.FhirTerser;
|
||||
import ca.uhn.fhir.util.UrlUtil;
|
||||
import org.hl7.fhir.instance.model.api.IBaseReference;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.slf4j.Logger;
|
||||
import org.springframework.batch.item.ItemReader;
|
||||
|
@ -58,6 +57,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
|
||||
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
|
||||
|
@ -68,8 +68,6 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
|
|||
private String myJobUUID;
|
||||
@Value("#{stepExecutionContext['resourceType']}")
|
||||
private String myResourceType;
|
||||
@Value("#{jobParameters['groupId']}")
|
||||
private String myGroupId;
|
||||
|
||||
@Autowired
|
||||
private IBulkExportJobDao myBulkExportJobDao;
|
||||
|
@ -82,9 +80,6 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
|
|||
@Autowired
|
||||
private EntityManager myEntityManager;
|
||||
|
||||
@Autowired
|
||||
private MatchUrlService myMatchUrlService;
|
||||
|
||||
private void loadResourcePids() {
|
||||
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
|
||||
if (!jobOpt.isPresent()) {
|
||||
|
@ -94,10 +89,11 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
|
|||
BulkExportJobEntity jobEntity = jobOpt.get();
|
||||
ourLog.info("Group Bulk export starting generation for batch export job: [{}] with resourceType [{}] and UUID [{}]", jobEntity, myResourceType, myJobUUID);
|
||||
|
||||
|
||||
//Fetch all the pids given the query.
|
||||
ISearchBuilder searchBuilder = getSearchBuilder();
|
||||
SearchParameterMap searchParameterMap = getParameterMap(jobEntity);
|
||||
|
||||
//Build comlex-ish _has query with a revincludes which allows lookup by group membership
|
||||
SearchParameterMap searchParameterMap = getSearchParameterMap(jobEntity);
|
||||
|
||||
IResultIterator resultIterator = searchBuilder.createQuery(
|
||||
searchParameterMap,
|
||||
|
@ -110,10 +106,19 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
|
|||
while (resultIterator.hasNext()) {
|
||||
myReadPids.add(resultIterator.next());
|
||||
}
|
||||
Set<ResourcePersistentId> resourcePersistentIds = searchBuilder.loadIncludes(myContext, myEntityManager, myReadPids, searchParameterMap.getRevIncludes(), true, searchParameterMap.getLastUpdated(), myJobUUID, null);
|
||||
myPidIterator = resourcePersistentIds.iterator();
|
||||
|
||||
//Given that databases explode when you have an IN clause with >1000 resources, we use the QueryChunker to break this into multiple queries.
|
||||
List<ResourcePersistentId> revIncludePids = new ArrayList<>();
|
||||
QueryChunker<ResourcePersistentId> chunker = new QueryChunker<>();
|
||||
|
||||
chunker.chunk(myReadPids, pidChunk -> {
|
||||
revIncludePids.addAll(searchBuilder.loadIncludes(myContext, myEntityManager, pidChunk, searchParameterMap.getRevIncludes(), true, searchParameterMap.getLastUpdated(), myJobUUID, null));
|
||||
});
|
||||
|
||||
myPidIterator = revIncludePids.iterator();
|
||||
}
|
||||
|
||||
//For all group revinclude queries, you need to perform the search on the Patient DAO, which is why this is hardcoded here.
|
||||
private ISearchBuilder getSearchBuilder() {
|
||||
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao("Patient");
|
||||
RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient");
|
||||
|
@ -122,41 +127,52 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
|
|||
}
|
||||
|
||||
@Nonnull
|
||||
private SearchParameterMap getParameterMap(BulkExportJobEntity jobEntity) {
|
||||
SearchParameterMap spm = new SearchParameterMap();
|
||||
spm.add("_has", new HasParam("Group", "member", "_id", myGroupId));
|
||||
spm.addRevInclude(new Include(lookupRevIncludeParameter(myResourceType)).toLocked());
|
||||
private SearchParameterMap getSearchParameterMap(BulkExportJobEntity jobEntity) {
|
||||
SearchParameterMap searchParameterMap = new SearchParameterMap();
|
||||
String groupIdFromRequest = getGroupIdFromRequest(jobEntity);
|
||||
searchParameterMap.add("_has", new HasParam("Group", "member", "_id", groupIdFromRequest));
|
||||
|
||||
|
||||
String revIncludeString = buildRevIncludeString();
|
||||
searchParameterMap.addRevInclude(new Include(revIncludeString).toLocked());
|
||||
|
||||
if (jobEntity.getSince() != null) {
|
||||
spm.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));
|
||||
searchParameterMap.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));
|
||||
}
|
||||
spm.setLoadSynchronous(true);
|
||||
return spm;
|
||||
searchParameterMap.setLoadSynchronous(true);
|
||||
return searchParameterMap;
|
||||
}
|
||||
|
||||
private String lookupRevIncludeParameter(String theResourceType) {
|
||||
switch (theResourceType) {
|
||||
case "Immunization":
|
||||
return "Immunization:patient";
|
||||
case "Observation":
|
||||
return "Observation:patient";
|
||||
default:
|
||||
throw new UnsupportedOperationException("You cannot currently do a group bulk export for type " + theResourceType);
|
||||
/**
|
||||
* Given the resource type of the job, fetch its patient compartment name, formatted for usage in an Include.
|
||||
* e.g. Immunization -> Immunization:patient
|
||||
*
|
||||
* @return A string which can be dropped directly into an Include.
|
||||
*/
|
||||
private String buildRevIncludeString() {
|
||||
RuntimeResourceDefinition runtimeResourceDefinition = myContext.getResourceDefinition(myResourceType);
|
||||
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
|
||||
if (searchParams == null || searchParams.size() == 0) {
|
||||
String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as it contains no Patient compartment", myResourceType);
|
||||
throw new IllegalArgumentException(errorMessage);
|
||||
} else {
|
||||
//The reason we grab the first here is that even if there _are_ multiple search params, they end up pointing to the same patient compartment.
|
||||
//So we can safely just grab the first.
|
||||
RuntimeSearchParam runtimeSearchParam = searchParams.get(0);
|
||||
String includeString = runtimeResourceDefinition.getName() + ":" + runtimeSearchParam.getName();
|
||||
return includeString;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private SearchParameterMap createSearchParameterMapFromTypeFilter(BulkExportJobEntity theJobEntity, RuntimeResourceDefinition theDef) {
|
||||
SearchParameterMap map = new SearchParameterMap();
|
||||
private String getGroupIdFromRequest(BulkExportJobEntity theJobEntity) {
|
||||
Map<String, String[]> requestUrl = UrlUtil.parseQueryStrings(theJobEntity.getRequest());
|
||||
String[] typeFilters = requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER);
|
||||
if (typeFilters != null) {
|
||||
Optional<String> filter = Arrays.stream(typeFilters).filter(t -> t.startsWith(myResourceType + "?")).findFirst();
|
||||
if (filter.isPresent()) {
|
||||
String matchUrl = filter.get();
|
||||
map = myMatchUrlService.translateMatchUrl(matchUrl, theDef);
|
||||
}
|
||||
String[] groupId= requestUrl.get(JpaConstants.PARAM_EXPORT_GROUP_ID);
|
||||
if (groupId != null) {
|
||||
return Arrays.stream(groupId).collect(Collectors.joining(","));
|
||||
} else {
|
||||
throw new IllegalStateException("You cannot run a Group export job without a " + JpaConstants.PARAM_EXPORT_GROUP_ID + " parameter as part of the request.");
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -571,7 +571,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
|||
patient.addName().setFamily("FAM" + i);
|
||||
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
|
||||
IIdType patId = myPatientDao.update(patient).getId().toUnqualifiedVersionless();
|
||||
group.addMember().setEntity(new Reference(patId));
|
||||
//Only add half the patients to the group.
|
||||
if (i % 2 == 0 ) {
|
||||
group.addMember().setEntity(new Reference(patId));
|
||||
}
|
||||
|
||||
Observation obs = new Observation();
|
||||
obs.setId("OBS" + i);
|
||||
|
|
Loading…
Reference in New Issue