Add post-fetch filtering to bulk export (#4772)

* Add post-fetch filtering to bulk export

* Add changelog

* Add validator

* Test fixes

* Test fix
This commit is contained in:
James Agnew 2023-04-26 17:05:03 -04:00 committed by GitHub
parent 6b9af3291e
commit 855e10a62d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 409 additions and 116 deletions

View File

@ -0,0 +1,10 @@
---
type: add
issue: 4772
backport: 6.4.5
title: "A new optional parameter called `_typePostFetchFilterUrl` has been added to the
Bulk Export `$export`
operation parameters. This parameter allows filters to be specified that will be applied
in-memory to the resources after that have been initially fetched by the database. This can
be used to allow complex filters which only remove small numbers of resources to be
efficiently applied against large datasets."

View File

@ -196,6 +196,10 @@ public class JpaConstants {
* Parameter for the $export operation
*/
public static final String PARAM_EXPORT_TYPE_FILTER = "_typeFilter";
/**
* Parameter for the $export operation
*/
public static final String PARAM_EXPORT_TYPE_POST_FETCH_FILTER_URL = "_typePostFetchFilterUrl";
/**
* Parameter for the $export operation
*/

View File

@ -67,6 +67,7 @@ import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -190,6 +191,7 @@ public class BulkDataExportProviderTest {
String patientResource = "Patient";
String practitionerResource = "Practitioner";
String filter = "Patient?identifier=foo";
String postFetchFilter = "Patient?_tag=foo";
when(myJobRunner.startNewJob(any()))
.thenReturn(createJobStartResponse());
@ -200,6 +202,7 @@ public class BulkDataExportProviderTest {
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType(patientResource + ", " + practitionerResource));
input.addParameter(JpaConstants.PARAM_EXPORT_SINCE, now);
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_FILTER, new StringType(filter));
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE_POST_FETCH_FILTER_URL, new StringType(postFetchFilter));
ourLog.debug(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
@ -236,6 +239,7 @@ public class BulkDataExportProviderTest {
assertEquals(Constants.CT_FHIR_NDJSON, params.getOutputFormat());
assertNotNull(params.getStartDate());
assertTrue(params.getFilters().contains(filter));
assertThat(params.getPostFetchFilterUrls(), contains("Patient?_tag=foo"));
}
@Test

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.util.BulkExportUtils;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.util.JsonUtil;
import com.google.common.collect.Sets;
import org.apache.commons.io.LineIterator;
@ -26,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.StringReader;
import java.util.Collections;
@ -41,6 +43,7 @@ import static ca.uhn.fhir.jpa.dao.r4.FhirResourceDaoR4TagsInlineTest.createSearc
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -747,6 +750,113 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
}
}
@Test
public void testValidateParameters_InvalidPostFetch_NoParams() {
// Setup
final BulkDataExportOptions options = createOptionsWithPostFetchFilterUrl("foo");
// Test
try {
myJobRunner.startNewJob(BulkExportUtils.createBulkExportJobParametersFromExportOptions(options));
fail();
} catch (InvalidRequestException e) {
// Verify
assertThat(e.getMessage(), containsString(
"Invalid post-fetch filter URL, must be in the format [resourceType]?[parameters]: foo"
));
}
}
@Test
public void testValidateParameters_InvalidPostFetch_NoParamsAfterQuestionMark() {
// Setup
final BulkDataExportOptions options = createOptionsWithPostFetchFilterUrl("Patient?");
// Test
try {
myJobRunner.startNewJob(BulkExportUtils.createBulkExportJobParametersFromExportOptions(options));
fail();
} catch (InvalidRequestException e) {
// Verify
assertThat(e.getMessage(), containsString(
"Invalid post-fetch filter URL, must be in the format [resourceType]?[parameters]: Patient?"
));
}
}
@Test
public void testValidateParameters_InvalidPostFetch_InvalidResourceType() {
// Setup
final BulkDataExportOptions options = createOptionsWithPostFetchFilterUrl("Foo?active=true");
// Test
try {
myJobRunner.startNewJob(BulkExportUtils.createBulkExportJobParametersFromExportOptions(options));
fail();
} catch (InvalidRequestException e) {
// Verify
assertThat(e.getMessage(), containsString(
"Invalid post-fetch filter URL, unknown resource type: Foo"
));
}
}
@Test
public void testValidateParameters_InvalidPostFetch_UnsupportedParam() {
// Setup
final BulkDataExportOptions options = createOptionsWithPostFetchFilterUrl("Observation?subject.identifier=blah");
// Test
try {
myJobRunner.startNewJob(BulkExportUtils.createBulkExportJobParametersFromExportOptions(options));
fail();
} catch (InvalidRequestException e) {
// Verify
assertThat(e.getMessage(), containsString(
"Chained parameters are not supported"
));
}
}
@Test
public void testValidateParameters_InvalidPostFetch_UnknownParam() {
// Setup
final BulkDataExportOptions options = createOptionsWithPostFetchFilterUrl("Observation?foo=blah");
// Test
try {
myJobRunner.startNewJob(BulkExportUtils.createBulkExportJobParametersFromExportOptions(options));
fail();
} catch (InvalidRequestException e) {
// Verify
assertThat(e.getMessage(), containsString("Invalid post-fetch filter URL."));
assertThat(e.getMessage(), containsString("Resource type Observation does not have a parameter with name: foo"));
}
}
@Nonnull
private static BulkDataExportOptions createOptionsWithPostFetchFilterUrl(String postFetchUrl) {
final BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Set.of("Patient"));
options.setFilters(new HashSet<>());
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
options.setOutputFormat(Constants.CT_FHIR_NDJSON);
options.setPostFetchFilterUrls(Set.of(postFetchUrl));
return options;
}
private static Stream<Set<String>> bulkExportOptionsResourceTypes() {
return Stream.of(Set.of("Patient", "Group"), Set.of("Patient", "Group", "Device"));
}

View File

@ -10,7 +10,9 @@ import ca.uhn.fhir.batch2.jobs.models.BatchResourceId;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentCaptor;
@ -18,10 +20,12 @@ import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.times;
@ -101,4 +105,96 @@ public class ExpandResourcesStepJpaTest extends BaseJpaR4Test {
}
@Test
public void testPostFetchFiltering() {
List<Long> matchingIds = new ArrayList<>();
List<Long> allIds = new ArrayList<>();
for (int i = 0; i < 5; i++) {
// Create a patient that will match the post-fetch filtering
Patient matchingPatient = new Patient();
matchingPatient.getMeta().addTag().setSystem("http://tag-system").setCode("tag-value");
Long matchingId = myPatientDao.create(matchingPatient, mySrd).getId().getIdPartAsLong();
matchingIds.add(matchingId);
allIds.add(matchingId);
// Create a patient that will not match the post-fetch filtering
Patient nonMatchingPatient = new Patient();
nonMatchingPatient.setActive(true);
nonMatchingPatient.getMeta().addTag().setSystem("http://tag-system").setCode("other-tag-value");
Long nonMatchingId = myPatientDao.create(nonMatchingPatient, mySrd).getId().getIdPartAsLong();
allIds.add(nonMatchingId);
}
// Setup
ResourceIdList resourceList = new ResourceIdList();
resourceList.setResourceType("Patient");
resourceList.setIds(allIds.stream().map(t -> new BatchResourceId().setResourceType("Patient").setId(Long.toString(t))).toList());
BulkExportJobParameters params = new BulkExportJobParameters();
params.setPostFetchFilterUrls(List.of("Patient?_tag=http://tag-system|tag-value"));
JobInstance jobInstance = new JobInstance();
String chunkId = "ABC";
StepExecutionDetails<BulkExportJobParameters, ResourceIdList> details = new StepExecutionDetails<>(params, resourceList, jobInstance, chunkId);
// Test
myCaptureQueriesListener.clear();
myExpandResourcesStep.run(details, mySink);
// Verify
verify(mySink, times(1)).accept(myWorkChunkCaptor.capture());
List<Long> resourceIds = myWorkChunkCaptor
.getValue()
.getStringifiedResources()
.stream()
.map(t -> myFhirContext.newJsonParser().parseResource(t).getIdElement().getIdPartAsLong())
.toList();
assertThat(resourceIds.toString(), resourceIds, containsInAnyOrder(matchingIds.toArray(new Long[0])));
}
@Test
public void testPostFetchFiltering_NoFiltersForGivenResourceType() {
List<Long> allIds = new ArrayList<>();
for (int i = 0; i < 5; i++) {
// Create a patient that will match the post-fetch filtering
Patient matchingPatient = new Patient();
matchingPatient.setActive(true);
Long matchingId = myPatientDao.create(matchingPatient, mySrd).getId().getIdPartAsLong();
allIds.add(matchingId);
}
// Setup
ResourceIdList resourceList = new ResourceIdList();
resourceList.setResourceType("Patient");
resourceList.setIds(allIds.stream().map(t -> new BatchResourceId().setResourceType("Patient").setId(Long.toString(t))).toList());
BulkExportJobParameters params = new BulkExportJobParameters();
params.setPostFetchFilterUrls(List.of("Observation?status=final"));
JobInstance jobInstance = new JobInstance();
String chunkId = "ABC";
StepExecutionDetails<BulkExportJobParameters, ResourceIdList> details = new StepExecutionDetails<>(params, resourceList, jobInstance, chunkId);
// Test
myCaptureQueriesListener.clear();
myExpandResourcesStep.run(details, mySink);
// Verify
verify(mySink, times(1)).accept(myWorkChunkCaptor.capture());
List<Long> resourceIds = myWorkChunkCaptor
.getValue()
.getStringifiedResources()
.stream()
.map(t -> myFhirContext.newJsonParser().parseResource(t).getIdElement().getIdPartAsLong())
.toList();
assertThat(resourceIds.toString(), resourceIds, containsInAnyOrder(allIds.toArray(new Long[0])));
}
}

View File

@ -1,64 +0,0 @@
package ca.uhn.fhir.jpa.testutil;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.model.BulkExportParameters;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
import ca.uhn.fhir.util.SearchParameterUtil;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class BulkExportBatch2TestUtils {
public static BulkExportParameters getBulkExportParametersFromOptions(FhirContext theCtx,
BulkDataExportOptions theOptions) {
BulkExportParameters parameters = new BulkExportParameters(Batch2JobDefinitionConstants.BULK_EXPORT);
parameters.setStartDate(theOptions.getSince());
parameters.setOutputFormat(theOptions.getOutputFormat());
parameters.setExportStyle(theOptions.getExportStyle());
if (theOptions.getFilters() != null) {
parameters.setFilters(new ArrayList<>(theOptions.getFilters()));
}
if (theOptions.getGroupId() != null) {
parameters.setGroupId(theOptions.getGroupId().getValue());
}
parameters.setExpandMdm(theOptions.isExpandMdm());
// resource types are special
// if none are provided, the job submitter adds them
// but we cannot manually start the job without correct parameters
// so we "correct" them here
if (CollectionUtils.isEmpty(theOptions.getResourceTypes())) {
addAllResourceTypes(parameters, theCtx);
}
else {
parameters.setResourceTypes(new ArrayList<>(theOptions.getResourceTypes()));
}
return parameters;
}
private static void addAllResourceTypes(BulkExportParameters theOptions, FhirContext theCtx) {
Set<String> rts = theCtx.getResourceTypes();
if (theOptions.getExportStyle() == BulkDataExportOptions.ExportStyle.SYSTEM) {
// everything
List<String> resourceTypes = rts.stream()
.filter(rt -> !rt.equalsIgnoreCase("Binary"))
.collect(Collectors.toList());
theOptions.setResourceTypes(resourceTypes);
}
else if (theOptions.getExportStyle() != null) {
// patients
List<String> patientRts = rts.stream()
.filter(rt -> SearchParameterUtil.isResourceTypeInPatientCompartment(theCtx, rt))
.collect(Collectors.toList());
theOptions.setResourceTypes(patientRts);
}
}
}

View File

@ -22,9 +22,14 @@ package ca.uhn.fhir.rest.api.server.bulk;
import org.hl7.fhir.instance.model.api.IIdType;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
// TODO: JA in next ticket - We have 3 more or less identical classes representing
// bulk data job parameters: BulkDataExportOptions, BulkExportJobParameters, and BulkExportParameters
// They don't seem to serve any distinct purpose so they should be collapsed into 1
public class BulkDataExportOptions {
public enum ExportStyle {
@ -37,6 +42,7 @@ public class BulkDataExportOptions {
private Set<String> myResourceTypes;
private Date mySince;
private Set<String> myFilters;
private Set<String> myPostFetchFilterUrls;
private ExportStyle myExportStyle;
private boolean myExpandMdm;
private IIdType myGroupId;
@ -90,6 +96,18 @@ public class BulkDataExportOptions {
return myFilters;
}
@Nonnull
public Set<String> getPostFetchFilterUrls() {
if (myPostFetchFilterUrls == null) {
myPostFetchFilterUrls = Set.of();
}
return myPostFetchFilterUrls;
}
public void setPostFetchFilterUrls(Set<String> thePostFetchFilterUrls) {
myPostFetchFilterUrls = thePostFetchFilterUrls;
}
public boolean isExpandMdm() {
return myExpandMdm;
}

View File

@ -23,8 +23,12 @@ import ca.uhn.fhir.batch2.api.IJobParametersValidator;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryResourceMatcher;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
@ -32,6 +36,8 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import static org.apache.commons.lang3.StringUtils.isBlank;
public class BulkExportJobParametersValidator implements IJobParametersValidator<BulkExportJobParameters> {
/** @deprecated use BulkDataExportProvider.UNSUPPORTED_BINARY_TYPE instead */
@ -39,6 +45,8 @@ public class BulkExportJobParametersValidator implements IJobParametersValidator
public static final String UNSUPPORTED_BINARY_TYPE = BulkDataExportProvider.UNSUPPORTED_BINARY_TYPE;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private InMemoryResourceMatcher myInMemoryResourceMatcher;
@Nullable
@Override
@ -81,6 +89,28 @@ public class BulkExportJobParametersValidator implements IJobParametersValidator
}
}
// Validate post fetch filter URLs
for (String next : theParameters.getPostFetchFilterUrls()) {
if (!next.contains("?") || isBlank(next.substring(next.indexOf('?') + 1))) {
errorMsgs.add("Invalid post-fetch filter URL, must be in the format [resourceType]?[parameters]: " + next);
continue;
}
String resourceType = next.substring(0, next.indexOf('?'));
if (!myDaoRegistry.isResourceTypeSupported(resourceType)) {
errorMsgs.add("Invalid post-fetch filter URL, unknown resource type: " + resourceType);
continue;
}
try {
InMemoryMatchResult inMemoryMatchResult = myInMemoryResourceMatcher.canBeEvaluatedInMemory(next);
if (!inMemoryMatchResult.supported()) {
errorMsgs.add("Invalid post-fetch filter URL, filter is not supported for in-memory matching \"" + next + "\". Reason: " + inMemoryMatchResult.getUnsupportedReason());
}
} catch (InvalidRequestException e) {
errorMsgs.add("Invalid post-fetch filter URL. Reason: " + e.getMessage());
}
}
return errorMsgs;
}
}

View File

@ -37,6 +37,8 @@ import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryResourceMatcher;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
@ -52,6 +54,7 @@ import org.springframework.context.ApplicationContext;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@ -84,24 +87,38 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
@Autowired
private IHapiTransactionService myTransactionService;
@Autowired
private InMemoryResourceMatcher myInMemoryResourceMatcher;
private volatile ResponseTerminologyTranslationSvc myResponseTerminologyTranslationSvc;
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<BulkExportJobParameters, ResourceIdList> theStepExecutionDetails,
@Nonnull IJobDataSink<ExpandedResourcesList> theDataSink) throws JobExecutionFailedException {
String instanceId = theStepExecutionDetails.getInstance().getInstanceId();
String chunkId = theStepExecutionDetails.getChunkId();
ResourceIdList idList = theStepExecutionDetails.getData();
BulkExportJobParameters jobParameters = theStepExecutionDetails.getParameters();
BulkExportJobParameters parameters = theStepExecutionDetails.getParameters();
ourLog.info("Step 2 for bulk export - Expand resources");
ourLog.info("About to expand {} resource IDs into their full resource bodies.", idList.getIds().size());
ourLog.info("Bulk export instance[{}] chunk[{}] - About to expand {} resource IDs into their full resource bodies.", instanceId, chunkId, idList.getIds().size());
// search the resources
List<IBaseResource> allResources = fetchAllResources(idList, jobParameters.getPartitionId());
List<IBaseResource> allResources = fetchAllResources(idList, parameters.getPartitionId());
// Apply post-fetch filtering
String resourceType = idList.getResourceType();
List<String> postFetchFilterUrls = parameters
.getPostFetchFilterUrls()
.stream()
.filter(t -> t.substring(0, t.indexOf('?')).equals(resourceType))
.collect(Collectors.toList());
if (!postFetchFilterUrls.isEmpty()) {
applyPostFetchFiltering(allResources, postFetchFilterUrls, instanceId, chunkId);
}
// if necessary, expand resources
if (jobParameters.isExpandMdm()) {
if (parameters.isExpandMdm()) {
myBulkExportProcessor.expandMdmResources(allResources);
}
@ -116,27 +133,62 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
}
// encode them - Key is resource type, Value is a collection of serialized resources of that type
ListMultimap<String, String> resources = encodeToString(allResources, jobParameters);
ListMultimap<String, String> resources = encodeToString(allResources, parameters);
// set to datasink
for (String nextResourceType : resources.keySet()) {
if (!resources.isEmpty()) {
for (String nextResourceType : resources.keySet()) {
ExpandedResourcesList output = new ExpandedResourcesList();
output.setStringifiedResources(resources.get(nextResourceType));
output.setResourceType(nextResourceType);
theDataSink.accept(output);
ourLog.info("Expanding of {} resources of type {} completed",
idList.getIds().size(),
idList.getResourceType());
ExpandedResourcesList output = new ExpandedResourcesList();
output.setStringifiedResources(resources.get(nextResourceType));
output.setResourceType(nextResourceType);
theDataSink.accept(output);
ourLog.info("Expanding of {} resources of type {} completed",
idList.getIds().size(),
idList.getResourceType());
}
}
// and return
return RunOutcome.SUCCESS;
}
private void applyPostFetchFiltering(List<IBaseResource> theResources, List<String> thePostFetchFilterUrls, String theInstanceId, String theChunkId) {
int numRemoved = 0;
for (Iterator<IBaseResource> iter = theResources.iterator(); iter.hasNext(); ) {
IBaseResource nextResource = iter.next();
String nextResourceType = myFhirContext.getResourceType(nextResource);
boolean matched = false;
for (String nextPostFetchFilterUrl : thePostFetchFilterUrls) {
// TODO: JA in next ticket - Add validation to the filter URLs when the job is submitted
// We should make sure that the format is [resourceType]?[at least one param] and that
// the param can be evaluated by the in memory matcher
if (nextPostFetchFilterUrl.contains("?")) {
String resourceType = nextPostFetchFilterUrl.substring(0, nextPostFetchFilterUrl.indexOf('?'));
if (nextResourceType.equals(resourceType)) {
InMemoryMatchResult matchResult = myInMemoryResourceMatcher.match(nextPostFetchFilterUrl, nextResource, null, new SystemRequestDetails());
if (matchResult.matched()) {
matched = true;
break;
}
}
}
}
if (!matched) {
iter.remove();
numRemoved++;
}
}
if (numRemoved > 0) {
ourLog.info("Bulk export instance[{}] chunk[{}] - {} resources were filtered out because of post-fetch filter URLs", theInstanceId, theChunkId, numRemoved);
}
}
private List<IBaseResource> fetchAllResources(ResourceIdList theIds, RequestPartitionId theRequestPartitionId) {
ArrayListMultimap<String, String> typeToIds = ArrayListMultimap.create();
theIds.getIds().forEach(t -> typeToIds.put(t.getResourceType(), t.getId()));

View File

@ -47,8 +47,6 @@ import java.util.Set;
public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobParameters, ResourceIdList> {
private static final Logger ourLog = LoggerFactory.getLogger(FetchResourceIdsStep.class);
public static final int MAX_IDS_TO_BATCH = 900;
@Autowired
private IBulkExportProcessor myBulkExportProcessor;

View File

@ -28,6 +28,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -49,6 +50,9 @@ public class BulkExportJobParameters extends BulkExportJobBase {
@JsonProperty("filters")
private List<String> myFilters;
@JsonProperty("postFetchFilterUrls")
private List<String> myPostFetchFilterUrls;
@JsonProperty("outputFormat")
private String myOutputFormat;
@ -74,6 +78,22 @@ public class BulkExportJobParameters extends BulkExportJobBase {
@JsonProperty("partitionId")
private RequestPartitionId myPartitionId;
public static BulkExportJobParameters createFromExportJobParameters(BulkExportParameters theParameters) {
BulkExportJobParameters params = new BulkExportJobParameters();
params.setResourceTypes(theParameters.getResourceTypes());
params.setExportStyle(theParameters.getExportStyle());
params.setFilters(theParameters.getFilters());
params.setPostFetchFilterUrls(theParameters.getPostFetchFilterUrls());
params.setGroupId(theParameters.getGroupId());
params.setOutputFormat(theParameters.getOutputFormat());
params.setStartDate(theParameters.getStartDate());
params.setExpandMdm(theParameters.isExpandMdm());
params.setPatientIds(theParameters.getPatientIds());
params.setOriginalRequestUrl(theParameters.getOriginalRequestUrl());
params.setPartitionId(theParameters.getPartitionId());
return params;
}
public List<String> getResourceTypes() {
return myResourceTypes;
}
@ -98,6 +118,17 @@ public class BulkExportJobParameters extends BulkExportJobBase {
myFilters = theFilters;
}
public List<String> getPostFetchFilterUrls() {
if (myPostFetchFilterUrls == null) {
myPostFetchFilterUrls = new ArrayList<>();
}
return myPostFetchFilterUrls;
}
public void setPostFetchFilterUrls(List<String> thePostFetchFilterUrls) {
myPostFetchFilterUrls = thePostFetchFilterUrls;
}
public String getOutputFormat() {
return myOutputFormat;
}
@ -138,35 +169,20 @@ public class BulkExportJobParameters extends BulkExportJobBase {
myExpandMdm = theExpandMdm;
}
private void setOriginalRequestUrl(String theOriginalRequestUrl) {
this.myOriginalRequestUrl = theOriginalRequestUrl;
}
public String getOriginalRequestUrl() {
return myOriginalRequestUrl;
}
public void setPartitionId(RequestPartitionId thePartitionId) {
this.myPartitionId = thePartitionId;
private void setOriginalRequestUrl(String theOriginalRequestUrl) {
this.myOriginalRequestUrl = theOriginalRequestUrl;
}
public RequestPartitionId getPartitionId() {
return myPartitionId;
}
public static BulkExportJobParameters createFromExportJobParameters(BulkExportParameters theParameters) {
BulkExportJobParameters params = new BulkExportJobParameters();
params.setResourceTypes(theParameters.getResourceTypes());
params.setExportStyle(theParameters.getExportStyle());
params.setFilters(theParameters.getFilters());
params.setGroupId(theParameters.getGroupId());
params.setOutputFormat(theParameters.getOutputFormat());
params.setStartDate(theParameters.getStartDate());
params.setExpandMdm(theParameters.isExpandMdm());
params.setPatientIds(theParameters.getPatientIds());
params.setOriginalRequestUrl(theParameters.getOriginalRequestUrl());
params.setPartitionId(theParameters.getPartitionId());
return params;
public void setPartitionId(RequestPartitionId thePartitionId) {
this.myPartitionId = thePartitionId;
}
}

View File

@ -48,6 +48,8 @@ public class BulkExportParameters extends Batch2BaseJobParameters {
*/
private List<String> myFilters;
private List<String> myPostFetchFilterUrls;
/**
* Export style - Patient, Group or Everything
*/
@ -125,6 +127,17 @@ public class BulkExportParameters extends Batch2BaseJobParameters {
myFilters = theFilters;
}
public List<String> getPostFetchFilterUrls() {
if (myPostFetchFilterUrls == null) {
myPostFetchFilterUrls = new ArrayList<>();
}
return myPostFetchFilterUrls;
}
public void setPostFetchFilterUrls(List<String> thePostFetchFilterUrls) {
myPostFetchFilterUrls = thePostFetchFilterUrls;
}
public BulkDataExportOptions.ExportStyle getExportStyle() {
return myExportStyle;
}

View File

@ -124,12 +124,13 @@ public class BulkDataExportProvider {
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
@OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType<Date> theSince,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = OperationParam.MAX_UNLIMITED, typeName = "string") List<IPrimitiveType<String>> theTypeFilter,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_POST_FETCH_FILTER_URL, min = 0, max = OperationParam.MAX_UNLIMITED, typeName = "string") List<IPrimitiveType<String>> theTypePostFetchFilterUrl,
ServletRequestDetails theRequestDetails
) {
// JPA export provider
validatePreferAsyncHeader(theRequestDetails, JpaConstants.OPERATION_EXPORT);
BulkDataExportOptions bulkDataExportOptions = buildSystemBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter);
BulkDataExportOptions bulkDataExportOptions = buildSystemBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter, theTypePostFetchFilterUrl);
startJob(theRequestDetails, bulkDataExportOptions);
}
@ -196,6 +197,7 @@ public class BulkDataExportProvider {
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
@OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType<Date> theSince,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = OperationParam.MAX_UNLIMITED, typeName = "string") List<IPrimitiveType<String>> theTypeFilter,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_POST_FETCH_FILTER_URL, min = 0, max = OperationParam.MAX_UNLIMITED, typeName = "string") List<IPrimitiveType<String>> theTypePostFetchFilterUrl,
@OperationParam(name = JpaConstants.PARAM_EXPORT_MDM, min = 0, max = 1, typeName = "boolean") IPrimitiveType<Boolean> theMdm,
ServletRequestDetails theRequestDetails
) {
@ -207,7 +209,7 @@ public class BulkDataExportProvider {
validatePreferAsyncHeader(theRequestDetails, JpaConstants.OPERATION_EXPORT);
BulkDataExportOptions bulkDataExportOptions = buildGroupBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter, theIdParam, theMdm);
BulkDataExportOptions bulkDataExportOptions = buildGroupBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter, theIdParam, theMdm, theTypePostFetchFilterUrl);
if (isNotEmpty(bulkDataExportOptions.getResourceTypes())) {
validateResourceTypesAllContainPatientSearchParams(bulkDataExportOptions.getResourceTypes());
@ -249,11 +251,12 @@ public class BulkDataExportProvider {
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
@OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType<Date> theSince,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = OperationParam.MAX_UNLIMITED, typeName = "string") List<IPrimitiveType<String>> theTypeFilter,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_POST_FETCH_FILTER_URL, min = 0, max = OperationParam.MAX_UNLIMITED, typeName = "string") List<IPrimitiveType<String>> theTypePostFetchFilterUrl,
@OperationParam(name = JpaConstants.PARAM_EXPORT_PATIENT, min = 0, max = OperationParam.MAX_UNLIMITED, typeName = "string") List<IPrimitiveType<String>> thePatient,
ServletRequestDetails theRequestDetails
) {
validatePreferAsyncHeader(theRequestDetails, JpaConstants.OPERATION_EXPORT);
BulkDataExportOptions bulkDataExportOptions = buildPatientBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter, thePatient);
BulkDataExportOptions bulkDataExportOptions = buildPatientBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter, thePatient, theTypePostFetchFilterUrl);
validateResourceTypesAllContainPatientSearchParams(bulkDataExportOptions.getResourceTypes());
startJob(theRequestDetails, bulkDataExportOptions);
@ -269,10 +272,11 @@ public class BulkDataExportProvider {
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
@OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType<Date> theSince,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = OperationParam.MAX_UNLIMITED, typeName = "string") List<IPrimitiveType<String>> theTypeFilter,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_POST_FETCH_FILTER_URL, min = 0, max = OperationParam.MAX_UNLIMITED, typeName = "string") List<IPrimitiveType<String>> theTypePostFetchFilterUrl,
ServletRequestDetails theRequestDetails
) {
validatePreferAsyncHeader(theRequestDetails, JpaConstants.OPERATION_EXPORT);
BulkDataExportOptions bulkDataExportOptions = buildPatientBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter, theIdParam);
BulkDataExportOptions bulkDataExportOptions = buildPatientBulkExportOptions(theOutputFormat, theType, theSince, theTypeFilter, theIdParam, theTypePostFetchFilterUrl);
validateResourceTypesAllContainPatientSearchParams(bulkDataExportOptions.getResourceTypes());
startJob(theRequestDetails, bulkDataExportOptions);
@ -371,6 +375,7 @@ public class BulkDataExportProvider {
break;
default:
ourLog.warn("Unrecognized status encountered: {}. Treating as BUILDING/SUBMITTED", info.getStatus().name());
//noinspection fallthrough
case BUILDING:
case SUBMITTED:
if (theRequestDetails.getRequestType() == RequestTypeEnum.DELETE) {
@ -413,12 +418,12 @@ public class BulkDataExportProvider {
}
}
private BulkDataExportOptions buildSystemBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, List<IPrimitiveType<String>> theTypeFilter) {
return buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.SYSTEM);
private BulkDataExportOptions buildSystemBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, List<IPrimitiveType<String>> theTypeFilter, List<IPrimitiveType<String>> theTypePostFetchFilterUrl) {
return buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.SYSTEM, theTypePostFetchFilterUrl);
}
private BulkDataExportOptions buildGroupBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, List<IPrimitiveType<String>> theTypeFilter, IIdType theGroupId, IPrimitiveType<Boolean> theExpandMdm) {
BulkDataExportOptions bulkDataExportOptions = buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.GROUP);
private BulkDataExportOptions buildGroupBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, List<IPrimitiveType<String>> theTypeFilter, IIdType theGroupId, IPrimitiveType<Boolean> theExpandMdm, List<IPrimitiveType<String>> theTypePostFetchFilterUrl) {
BulkDataExportOptions bulkDataExportOptions = buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.GROUP, theTypePostFetchFilterUrl);
bulkDataExportOptions.setGroupId(theGroupId);
boolean mdm = false;
@ -430,26 +435,26 @@ public class BulkDataExportProvider {
return bulkDataExportOptions;
}
private BulkDataExportOptions buildPatientBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, List<IPrimitiveType<String>> theTypeFilter, List<IPrimitiveType<String>> thePatientIds) {
private BulkDataExportOptions buildPatientBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, List<IPrimitiveType<String>> theTypeFilter, List<IPrimitiveType<String>> thePatientIds, List<IPrimitiveType<String>> theTypePostFetchFilterUrl) {
IPrimitiveType<String> type = theType;
if (type == null) {
// Type is optional, but the job requires it
type = new StringDt("Patient");
}
BulkDataExportOptions bulkDataExportOptions = buildBulkDataExportOptions(theOutputFormat, type, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.PATIENT);
BulkDataExportOptions bulkDataExportOptions = buildBulkDataExportOptions(theOutputFormat, type, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.PATIENT, theTypePostFetchFilterUrl);
if (thePatientIds != null) {
bulkDataExportOptions.setPatientIds(thePatientIds.stream().map((pid) -> new IdType(pid.getValueAsString())).collect(Collectors.toSet()));
}
return bulkDataExportOptions;
}
private BulkDataExportOptions buildPatientBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, List<IPrimitiveType<String>> theTypeFilter, IIdType thePatientId) {
BulkDataExportOptions bulkDataExportOptions = buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.PATIENT);
private BulkDataExportOptions buildPatientBulkExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, List<IPrimitiveType<String>> theTypeFilter, IIdType thePatientId, List<IPrimitiveType<String>> theTypePostFetchFilterUrl) {
BulkDataExportOptions bulkDataExportOptions = buildBulkDataExportOptions(theOutputFormat, theType, theSince, theTypeFilter, BulkDataExportOptions.ExportStyle.PATIENT, theTypePostFetchFilterUrl);
bulkDataExportOptions.setPatientIds(Collections.singleton(thePatientId));
return bulkDataExportOptions;
}
private BulkDataExportOptions buildBulkDataExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, List<IPrimitiveType<String>> theTypeFilter, BulkDataExportOptions.ExportStyle theExportStyle) {
private BulkDataExportOptions buildBulkDataExportOptions(IPrimitiveType<String> theOutputFormat, IPrimitiveType<String> theType, IPrimitiveType<Date> theSince, List<IPrimitiveType<String>> theTypeFilter, BulkDataExportOptions.ExportStyle theExportStyle, List<IPrimitiveType<String>> theTypePostFetchFilterUrl) {
String outputFormat = theOutputFormat != null ? theOutputFormat.getValueAsString() : Constants.CT_FHIR_NDJSON;
Set<String> resourceTypes = null;
@ -463,9 +468,11 @@ public class BulkDataExportProvider {
}
Set<String> typeFilters = splitTypeFilters(theTypeFilter);
Set<String> typePostFetchFilterUrls = splitTypeFilters(theTypePostFetchFilterUrl);
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setFilters(typeFilters);
bulkDataExportOptions.setPostFetchFilterUrls(typePostFetchFilterUrls);
bulkDataExportOptions.setExportStyle(theExportStyle);
bulkDataExportOptions.setSince(since);
bulkDataExportOptions.setResourceTypes(resourceTypes);

View File

@ -40,9 +40,8 @@ public class BulkExportUtils {
parameters.setStartDate(theOptions.getSince());
parameters.setOutputFormat(theOptions.getOutputFormat());
parameters.setExportStyle(theOptions.getExportStyle());
if (CollectionUtils.isNotEmpty(theOptions.getFilters())) {
parameters.setFilters(new ArrayList<>(theOptions.getFilters()));
}
parameters.setFilters(new ArrayList<>(theOptions.getFilters()));
parameters.setPostFetchFilterUrls(new ArrayList<>(theOptions.getPostFetchFilterUrls()));
if (theOptions.getGroupId() != null) {
parameters.setGroupId(theOptions.getGroupId().getValue());
}