Enforce maximum chunk size in bulk export (#5800)

* Limit bulk export chunk size

* Cleanup

* Work on tests

* Working

* Add some docs

* Address revuew comments

* Spotless

* Test fix
This commit is contained in:
James Agnew 2024-04-01 09:28:46 -04:00 committed by GitHub
parent 9e5db198bf
commit 6ce3b17460
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 862 additions and 331 deletions

View File

@ -0,0 +1,6 @@
---
type: add
issue: 5800
title: "A new setting in JpaStorageSettings enforces a maximum file size for Bulk Export
output files, as well as work chunks creating during processing. This setting has
a default value of 100 MB."

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
@ -10,6 +11,9 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.rest.api.Constants;
@ -21,11 +25,13 @@ import ca.uhn.fhir.rest.client.apache.ResourceEntity;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import ca.uhn.fhir.test.utilities.HttpClientExtension;
import ca.uhn.fhir.test.utilities.ProxyUtil;
import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
import ca.uhn.fhir.util.JsonUtil;
import com.google.common.collect.Sets;
import jakarta.annotation.Nonnull;
import org.apache.commons.io.LineIterator;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
@ -66,6 +72,7 @@ import org.mockito.Spy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import java.io.IOException;
import java.io.StringReader;
@ -80,6 +87,8 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static ca.uhn.fhir.batch2.jobs.export.BulkExportAppCtx.CREATE_REPORT_STEP;
import static ca.uhn.fhir.batch2.jobs.export.BulkExportAppCtx.WRITE_TO_BINARIES;
import static ca.uhn.fhir.jpa.dao.r4.FhirResourceDaoR4TagsInlineTest.createSearchParameterForInlineSecurity;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.awaitility.Awaitility.await;
@ -100,17 +109,25 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
@Autowired
private IJobCoordinator myJobCoordinator;
@Autowired
private IBatch2WorkChunkRepository myWorkChunkRepository;
@Autowired
private IJobPersistence myJobPersistence;
private JpaJobPersistenceImpl myJobPersistenceImpl;
@AfterEach
void afterEach() {
myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.DISABLED);
myStorageSettings.setTagStorageMode(new JpaStorageSettings().getTagStorageMode());
myStorageSettings.setResourceClientIdStrategy(new JpaStorageSettings().getResourceClientIdStrategy());
JpaStorageSettings defaults = new JpaStorageSettings();
myStorageSettings.setTagStorageMode(defaults.getTagStorageMode());
myStorageSettings.setResourceClientIdStrategy(defaults.getResourceClientIdStrategy());
myStorageSettings.setBulkExportFileMaximumSize(defaults.getBulkExportFileMaximumSize());
}
@BeforeEach
public void beforeEach() {
myStorageSettings.setJobFastTrackingEnabled(false);
myJobPersistenceImpl = ProxyUtil.getSingletonTarget(myJobPersistence, JpaJobPersistenceImpl.class);
}
@Spy

View File

@ -0,0 +1,177 @@
package ca.uhn.fhir.jpa.bulk.export;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.export.ExpandResourceAndWriteBinaryStep;
import ca.uhn.fhir.batch2.jobs.export.ExpandResourcesStep;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportBinaryFileId;
import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.jobs.models.BatchResourceId;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters;
import jakarta.persistence.Id;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.r4.model.Binary;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class ExpandResourcesAndWriteBinaryStepJpaTest extends BaseJpaR4Test {
@Autowired
private ExpandResourceAndWriteBinaryStep myExpandResourcesStep;
@Mock
private IJobDataSink<BulkExportBinaryFileId> mySink;
@Captor
private ArgumentCaptor<BulkExportBinaryFileId> myWorkChunkCaptor;
@Override
public void afterCleanupDao() {
super.afterCleanupDao();
JpaStorageSettings defaults = new JpaStorageSettings();
myStorageSettings.setBulkExportFileMaximumSize(defaults.getBulkExportFileMaximumSize());
}
@Test
public void testMaximumChunkSize() {
/*
* We're going to set the maximum file size to 3000, and create some resources with
* a name that is 1000 chars long. With the other boilerplate text in a resource that
* will put the resource length at just over 1000 chars, meaning that any given
* chunk or file should have only 2 resources in it.
*/
int testResourceSize = 1000;
int maxFileSize = 3 * testResourceSize;
myStorageSettings.setBulkExportFileMaximumSize(maxFileSize);
List<BatchResourceId> expectedIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Patient p = new Patient();
p.addName().setFamily(StringUtils.leftPad("", testResourceSize, 'A'));
String id = myPatientDao.create(p, mySrd).getId().getIdPart();
expectedIds.add(new BatchResourceId().setResourceType("Patient").setId(id));
}
Collections.sort(expectedIds);
ResourceIdList resourceList = new ResourceIdList();
resourceList.setResourceType("Patient");
resourceList.setIds(expectedIds);
BulkExportJobParameters params = new BulkExportJobParameters();
JobInstance jobInstance = new JobInstance();
String chunkId = "ABC";
StepExecutionDetails<BulkExportJobParameters, ResourceIdList> details = new StepExecutionDetails<>(params, resourceList, jobInstance, chunkId);
// Test
myExpandResourcesStep.run(details, mySink);
// Verify
verify(mySink, atLeast(1)).accept(myWorkChunkCaptor.capture());
List<BatchResourceId> actualResourceIdList = new ArrayList<>();
for (BulkExportBinaryFileId next : myWorkChunkCaptor.getAllValues()) {
Binary nextBinary = myBinaryDao.read(new IdType(next.getBinaryId()), mySrd);
String nextNdJsonString = new String(nextBinary.getContent(), StandardCharsets.UTF_8);
// This is the most important check here
assertThat(nextNdJsonString.length(), lessThanOrEqualTo(maxFileSize));
Arrays.stream(nextNdJsonString.split("\\n"))
.filter(StringUtils::isNotBlank)
.map(t->myFhirContext.newJsonParser().parseResource(t))
.map(t->new BatchResourceId().setResourceType(t.getIdElement().getResourceType()).setId(t.getIdElement().getIdPart()))
.forEach(actualResourceIdList::add);
}
Collections.sort(actualResourceIdList);
assertEquals(expectedIds, actualResourceIdList);
}
@Test
public void testMaximumChunkSize_SingleFileExceedsMaximum() {
/*
* We're going to set the maximum file size to 1000, and create some resources
* with a name that is 1500 chars long. In this case, we'll exceed the
* configured maximum, so it should be one output file per resourcs.
*/
int testResourceSize = 1500;
int maxFileSize = 1000;
myStorageSettings.setBulkExportFileMaximumSize(maxFileSize);
List<BatchResourceId> expectedIds = new ArrayList<>();
int numberOfResources = 10;
for (int i = 0; i < numberOfResources; i++) {
Patient p = new Patient();
p.addName().setFamily(StringUtils.leftPad("", testResourceSize, 'A'));
String id = myPatientDao.create(p, mySrd).getId().getIdPart();
expectedIds.add(new BatchResourceId().setResourceType("Patient").setId(id));
}
Collections.sort(expectedIds);
ResourceIdList resourceList = new ResourceIdList();
resourceList.setResourceType("Patient");
resourceList.setIds(expectedIds);
BulkExportJobParameters params = new BulkExportJobParameters();
JobInstance jobInstance = new JobInstance();
String chunkId = "ABC";
StepExecutionDetails<BulkExportJobParameters, ResourceIdList> details = new StepExecutionDetails<>(params, resourceList, jobInstance, chunkId);
// Test
myExpandResourcesStep.run(details, mySink);
// Verify
// This is the most important check - we should have one file per resource
verify(mySink, times(numberOfResources)).accept(myWorkChunkCaptor.capture());
List<BatchResourceId> actualResourceIdList = new ArrayList<>();
for (BulkExportBinaryFileId next : myWorkChunkCaptor.getAllValues()) {
Binary nextBinary = myBinaryDao.read(new IdType(next.getBinaryId()), mySrd);
String nextNdJsonString = new String(nextBinary.getContent(), StandardCharsets.UTF_8);
Arrays.stream(nextNdJsonString.split("\\n"))
.filter(StringUtils::isNotBlank)
.map(t->myFhirContext.newJsonParser().parseResource(t))
.map(t->new BatchResourceId().setResourceType(t.getIdElement().getResourceType()).setId(t.getIdElement().getIdPart()))
.forEach(actualResourceIdList::add);
}
Collections.sort(actualResourceIdList);
assertEquals(expectedIds, actualResourceIdList);
}
}

View File

@ -3,13 +3,14 @@ package ca.uhn.fhir.jpa.bulk.export;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.export.ExpandResourcesStep;
import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters;
import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.jobs.models.BatchResourceId;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@ -20,13 +21,17 @@ import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -44,7 +49,9 @@ public class ExpandResourcesStepJpaTest extends BaseJpaR4Test {
public void afterCleanupDao() {
super.afterCleanupDao();
myStorageSettings.setTagStorageMode(new JpaStorageSettings().getTagStorageMode());
JpaStorageSettings defaults = new JpaStorageSettings();
myStorageSettings.setTagStorageMode(defaults.getTagStorageMode());
myStorageSettings.setBulkExportFileMaximumSize(defaults.getBulkExportFileMaximumSize());
}
/**
@ -194,4 +201,60 @@ public class ExpandResourcesStepJpaTest extends BaseJpaR4Test {
}
@Test
public void testMaximumChunkSize() {
/*
* We're going to set the maximum file size to 3000, and create some resources with
* a name that is 1000 chars long. With the other boilerplate text in a resource that
* will put the resource length at just over 1000 chars, meaning that any given
* chunk or file should have only 2 resources in it.
*/
int testResourceSize = 1000;
int maxFileSize = 3 * testResourceSize;
myStorageSettings.setBulkExportFileMaximumSize(maxFileSize);
List<BatchResourceId> expectedIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Patient p = new Patient();
p.addName().setFamily(StringUtils.leftPad("", testResourceSize, 'A'));
String id = myPatientDao.create(p, mySrd).getId().getIdPart();
expectedIds.add(new BatchResourceId().setResourceType("Patient").setId(id));
}
Collections.sort(expectedIds);
ResourceIdList resourceList = new ResourceIdList();
resourceList.setResourceType("Patient");
resourceList.setIds(expectedIds);
BulkExportJobParameters params = new BulkExportJobParameters();
JobInstance jobInstance = new JobInstance();
String chunkId = "ABC";
StepExecutionDetails<BulkExportJobParameters, ResourceIdList> details = new StepExecutionDetails<>(params, resourceList, jobInstance, chunkId);
// Test
myCaptureQueriesListener.clear();
myExpandResourcesStep.run(details, mySink);
// Verify
verify(mySink, atLeast(1)).accept(myWorkChunkCaptor.capture());
List<BatchResourceId> actualResourceIdList = new ArrayList<>();
for (var next : myWorkChunkCaptor.getAllValues()) {
int nextSize = String.join("\n", next.getStringifiedResources()).length();
ourLog.info("Next size: {}", nextSize);
assertThat(nextSize, lessThanOrEqualTo(maxFileSize));
next.getStringifiedResources().stream()
.filter(StringUtils::isNotBlank)
.map(t->myFhirContext.newJsonParser().parseResource(t))
.map(t->new BatchResourceId().setResourceType(t.getIdElement().getResourceType()).setId(t.getIdElement().getIdPart()))
.forEach(actualResourceIdList::add);
}
Collections.sort(actualResourceIdList);
assertEquals(expectedIds, actualResourceIdList);
}
}

View File

@ -10,14 +10,18 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.r4.FhirResourceDaoR4TagsTest;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.util.JsonUtil;
import org.hl7.fhir.r4.model.DateTimeType;
import org.hl7.fhir.r4.model.OrganizationAffiliation;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -38,7 +42,9 @@ public class FetchResourceIdsStepJpaTest extends BaseJpaR4Test {
public void afterCleanupDao() {
super.afterCleanupDao();
myStorageSettings.setTagStorageMode(new JpaStorageSettings().getTagStorageMode());
JpaStorageSettings defaults = new JpaStorageSettings();
myStorageSettings.setTagStorageMode(defaults.getTagStorageMode());
myStorageSettings.setBulkExportFileMaximumSize(defaults.getBulkExportFileMaximumSize());
}
@Test
@ -74,6 +80,39 @@ public class FetchResourceIdsStepJpaTest extends BaseJpaR4Test {
assertEquals(10, idList.getIds().size());
}
@Test
public void testChunkMaximumSize() {
myStorageSettings.setBulkExportFileMaximumSize(500);
for (int i = 0; i < 100; i++) {
OrganizationAffiliation orgAff = new OrganizationAffiliation();
orgAff.setActive(true);
myOrganizationAffiliationDao.create(orgAff, mySrd);
}
BulkExportJobParameters params = new BulkExportJobParameters();
params.setResourceTypes(List.of("OrganizationAffiliation"));
VoidModel data = new VoidModel();
JobInstance instance = new JobInstance();
instance.setInstanceId("instance-id");
String chunkId = "chunk-id";
StepExecutionDetails<BulkExportJobParameters, VoidModel> executionDetails = new StepExecutionDetails<>(params, data, instance, chunkId);
// Test
myFetchResourceIdsStep.run(executionDetails, mySink);
// Verify
verify(mySink, Mockito.atLeast(1)).accept(myResourceIdListCaptor.capture());
List<ResourceIdList> idLists = myResourceIdListCaptor.getAllValues();
for (var next : idLists) {
String serialized = JsonUtil.serialize(next, false);
// Note that the 600 is a bit higher than the configured maximum of 500 above,
// because our chunk size estimate is not totally accurate, but it's not
// going to be way off, less than 100 regardless of how big the maximum is
assertThat(serialized, serialized.length(), lessThanOrEqualTo(600));
}
}
}

View File

@ -0,0 +1,20 @@
package ca.uhn.fhir.batch2.jobs.models;
import ca.uhn.fhir.util.JsonUtil;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
class BatchResourceIdTest {
@Test
public void testEstimateSize() {
BatchResourceId id = new BatchResourceId();
id.setId("12345");
id.setResourceType("Patient");
String serialized = JsonUtil.serialize(id, false);
assertEquals(serialized.length(), id.estimateSerializedSize(), serialized);
}
}

View File

@ -36,6 +36,7 @@ import org.springframework.context.annotation.Scope;
public class BulkExportAppCtx {
public static final String WRITE_TO_BINARIES = "write-to-binaries";
public static final String CREATE_REPORT_STEP = "create-report-step";
@Bean
public JobDefinition bulkExportJobDefinition() {
@ -65,7 +66,7 @@ public class BulkExportAppCtx {
writeBinaryStep())
// finalize the job (set to complete)
.addFinalReducerStep(
"create-report-step",
CREATE_REPORT_STEP,
"Creates the output report from a bulk export job",
BulkExportJobResults.class,
createReportStep())
@ -119,16 +120,25 @@ public class BulkExportAppCtx {
return new FetchResourceIdsStep();
}
/**
* Note, this bean is only used for version 1 of the bulk export job definition
*/
@Bean
public ExpandResourcesStep expandResourcesStep() {
return new ExpandResourcesStep();
}
/**
* Note, this bean is only used for version 1 of the bulk export job definition
*/
@Bean
public WriteBinaryStep writeBinaryStep() {
return new WriteBinaryStep();
}
/**
* Note, this bean is only used for version 2 of the bulk export job definition
*/
@Bean
public ExpandResourceAndWriteBinaryStep expandResourceAndWriteBinaryStep() {
return new ExpandResourceAndWriteBinaryStep();

View File

@ -34,6 +34,7 @@ import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
@ -41,7 +42,6 @@ import ca.uhn.fhir.jpa.api.model.PersistentIdToForcedIdMap;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
@ -75,11 +75,13 @@ import org.springframework.context.ApplicationContext;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static ca.uhn.fhir.rest.api.Constants.PARAM_ID;
@ -103,7 +105,7 @@ public class ExpandResourceAndWriteBinaryStep
private IBulkExportProcessor<?> myBulkExportProcessor;
@Autowired
private StorageSettings myStorageSettings;
private JpaStorageSettings myStorageSettings;
@Autowired
private ApplicationContext myApplicationContext;
@ -119,6 +121,23 @@ public class ExpandResourceAndWriteBinaryStep
private volatile ResponseTerminologyTranslationSvc myResponseTerminologyTranslationSvc;
/**
* Note on the design of this step:
* This step takes a list of resource PIDs as input, fetches those
* resources, applies a bunch of filtering/consent/MDM/etc. modifications
* on them, serializes the result as NDJSON files, and then persists those
* NDJSON files as Binary resources.
* <p>
* We want to avoid writing files which exceed the configured maximum
* file size, and we also want to avoid keeping too much in memory
* at any given time, so this class works a bit like a stream processor
* (although not using Java streams).
* <p>
* The {@link #fetchResourcesByIdAndConsumeThem(ResourceIdList, RequestPartitionId, Consumer)}
* method loads the resources by ID, {@link ExpandResourcesConsumer} handles
* the filtering and whatnot, then the {@link NdJsonResourceWriter}
* ultimately writes them.
*/
@Nonnull
@Override
public RunOutcome run(
@ -126,235 +145,36 @@ public class ExpandResourceAndWriteBinaryStep
@Nonnull IJobDataSink<BulkExportBinaryFileId> theDataSink)
throws JobExecutionFailedException {
List<ExpandedResourcesList> expandedResourcesList = expandResourcesFromList(theStepExecutionDetails);
int numResourcesProcessed = 0;
ourLog.info("Write binary step of Job Export");
// Currently only NDJSON output format is supported, but we could add other
// kinds of writers here for other formats if needed
NdJsonResourceWriter resourceWriter = new NdJsonResourceWriter(theStepExecutionDetails, theDataSink);
// write to binary each resource type separately, without chunking, we need to do this in a loop now
for (ExpandedResourcesList expandedResources : expandedResourcesList) {
expandResourcesFromList(theStepExecutionDetails, resourceWriter);
numResourcesProcessed += expandedResources.getStringifiedResources().size();
ourLog.info("Writing {} resources to binary file", numResourcesProcessed);
@SuppressWarnings("unchecked")
IFhirResourceDao<IBaseBinary> binaryDao = myDaoRegistry.getResourceDao("Binary");
IBaseBinary binary = BinaryUtil.newBinary(myFhirContext);
addMetadataExtensionsToBinary(theStepExecutionDetails, expandedResources, binary);
// TODO
// should be dependent on the output format in parameters but for now, only NDJSON is supported
binary.setContentType(Constants.CT_FHIR_NDJSON);
int processedRecordsCount = 0;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
try (OutputStreamWriter streamWriter = getStreamWriter(outputStream)) {
for (String stringified : expandedResources.getStringifiedResources()) {
streamWriter.append(stringified);
streamWriter.append("\n");
processedRecordsCount++;
}
streamWriter.flush();
outputStream.flush();
}
binary.setContent(outputStream.toByteArray());
} catch (IOException ex) {
String errorMsg = String.format(
"Failure to process resource of type %s : %s",
expandedResources.getResourceType(), ex.getMessage());
ourLog.error(errorMsg);
throw new JobExecutionFailedException(Msg.code(2431) + errorMsg);
}
SystemRequestDetails srd = new SystemRequestDetails();
BulkExportJobParameters jobParameters = theStepExecutionDetails.getParameters();
RequestPartitionId partitionId = jobParameters.getPartitionId();
if (partitionId == null) {
srd.setRequestPartitionId(RequestPartitionId.defaultPartition());
} else {
srd.setRequestPartitionId(partitionId);
}
// Pick a unique ID and retry until we get one that isn't already used. This is just to
// avoid any possibility of people guessing the IDs of these Binaries and fishing for them.
while (true) {
// Use a random ID to make it harder to guess IDs - 32 characters of a-zA-Z0-9
// has 190 bts of entropy according to https://www.omnicalculator.com/other/password-entropy
String proposedId = RandomTextUtils.newSecureRandomAlphaNumericString(32);
binary.setId(proposedId);
// Make sure we don't accidentally reuse an ID. This should be impossible given the
// amount of entropy in the IDs but might as well be sure.
try {
IBaseBinary output = binaryDao.read(binary.getIdElement(), new SystemRequestDetails(), true);
if (output != null) {
continue;
}
} catch (ResourceNotFoundException e) {
// good
}
break;
}
if (myFhirContext.getVersion().getVersion().isNewerThan(FhirVersionEnum.DSTU2)) {
if (isNotBlank(jobParameters.getBinarySecurityContextIdentifierSystem())
|| isNotBlank(jobParameters.getBinarySecurityContextIdentifierValue())) {
FhirTerser terser = myFhirContext.newTerser();
terser.setElement(
binary,
"securityContext.identifier.system",
jobParameters.getBinarySecurityContextIdentifierSystem());
terser.setElement(
binary,
"securityContext.identifier.value",
jobParameters.getBinarySecurityContextIdentifierValue());
}
}
DaoMethodOutcome outcome = binaryDao.update(binary, srd);
IIdType id = outcome.getId();
BulkExportBinaryFileId bulkExportBinaryFileId = new BulkExportBinaryFileId();
bulkExportBinaryFileId.setBinaryId(id.getValueAsString());
bulkExportBinaryFileId.setResourceType(expandedResources.getResourceType());
theDataSink.accept(bulkExportBinaryFileId);
ourLog.info(
"Binary writing complete for {} resources of type {}.",
processedRecordsCount,
expandedResources.getResourceType());
}
return new RunOutcome(numResourcesProcessed);
return new RunOutcome(resourceWriter.getNumResourcesProcessed());
}
private List<ExpandedResourcesList> expandResourcesFromList(
StepExecutionDetails<BulkExportJobParameters, ResourceIdList> theStepExecutionDetails) {
List<ExpandedResourcesList> expandedResourcesList = new ArrayList<>();
String instanceId = theStepExecutionDetails.getInstance().getInstanceId();
String chunkId = theStepExecutionDetails.getChunkId();
private void expandResourcesFromList(
StepExecutionDetails<BulkExportJobParameters, ResourceIdList> theStepExecutionDetails,
Consumer<ExpandedResourcesList> theResourceWriter) {
ResourceIdList idList = theStepExecutionDetails.getData();
BulkExportJobParameters parameters = theStepExecutionDetails.getParameters();
ourLog.info(
"Bulk export instance[{}] chunk[{}] - About to expand {} resource IDs into their full resource bodies.",
instanceId,
chunkId,
idList.getIds().size());
Consumer<List<IBaseResource>> resourceListConsumer =
new ExpandResourcesConsumer(theStepExecutionDetails, theResourceWriter);
// search the resources
List<IBaseResource> allResources = fetchAllResources(idList, parameters.getPartitionId());
// Apply post-fetch filtering
String resourceType = idList.getResourceType();
List<String> postFetchFilterUrls = parameters.getPostFetchFilterUrls().stream()
.filter(t -> t.substring(0, t.indexOf('?')).equals(resourceType))
.collect(Collectors.toList());
if (!postFetchFilterUrls.isEmpty()) {
applyPostFetchFiltering(allResources, postFetchFilterUrls, instanceId, chunkId);
}
// if necessary, expand resources
if (parameters.isExpandMdm()) {
myBulkExportProcessor.expandMdmResources(allResources);
}
// Normalize terminology
if (myStorageSettings.isNormalizeTerminologyForBulkExportJobs()) {
ResponseTerminologyTranslationSvc terminologyTranslationSvc = myResponseTerminologyTranslationSvc;
if (terminologyTranslationSvc == null) {
terminologyTranslationSvc = myApplicationContext.getBean(ResponseTerminologyTranslationSvc.class);
myResponseTerminologyTranslationSvc = terminologyTranslationSvc;
}
terminologyTranslationSvc.processResourcesForTerminologyTranslation(allResources);
}
// Interceptor call
if (myInterceptorService.hasHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION)) {
for (Iterator<IBaseResource> iter = allResources.iterator(); iter.hasNext(); ) {
HookParams params = new HookParams()
.add(BulkExportJobParameters.class, theStepExecutionDetails.getParameters())
.add(IBaseResource.class, iter.next());
boolean outcome =
myInterceptorService.callHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION, params);
if (!outcome) {
iter.remove();
}
}
}
// encode them - Key is resource type, Value is a collection of serialized resources of that type
ListMultimap<String, String> resources = encodeToString(allResources, parameters);
for (String nextResourceType : resources.keySet()) {
ExpandedResourcesList output = new ExpandedResourcesList();
output.setStringifiedResources(resources.get(nextResourceType));
output.setResourceType(nextResourceType);
expandedResourcesList.add(output);
ourLog.info(
"Expanding of {} resources of type {} completed",
idList.getIds().size(),
idList.getResourceType());
}
return expandedResourcesList;
fetchResourcesByIdAndConsumeThem(idList, parameters.getPartitionId(), resourceListConsumer);
}
private void applyPostFetchFiltering(
List<IBaseResource> theResources,
List<String> thePostFetchFilterUrls,
String theInstanceId,
String theChunkId) {
int numRemoved = 0;
for (Iterator<IBaseResource> iter = theResources.iterator(); iter.hasNext(); ) {
boolean matched = applyPostFetchFilteringForSingleResource(thePostFetchFilterUrls, iter);
if (!matched) {
iter.remove();
numRemoved++;
}
}
if (numRemoved > 0) {
ourLog.info(
"Bulk export instance[{}] chunk[{}] - {} resources were filtered out because of post-fetch filter URLs",
theInstanceId,
theChunkId,
numRemoved);
}
}
private boolean applyPostFetchFilteringForSingleResource(
List<String> thePostFetchFilterUrls, Iterator<IBaseResource> iter) {
IBaseResource nextResource = iter.next();
String nextResourceType = myFhirContext.getResourceType(nextResource);
for (String nextPostFetchFilterUrl : thePostFetchFilterUrls) {
if (nextPostFetchFilterUrl.contains("?")) {
String resourceType = nextPostFetchFilterUrl.substring(0, nextPostFetchFilterUrl.indexOf('?'));
if (nextResourceType.equals(resourceType)) {
InMemoryMatchResult matchResult = myInMemoryResourceMatcher.match(
nextPostFetchFilterUrl, nextResource, null, new SystemRequestDetails());
if (matchResult.matched()) {
return true;
}
}
}
}
return false;
}
private List<IBaseResource> fetchAllResources(ResourceIdList theIds, RequestPartitionId theRequestPartitionId) {
private void fetchResourcesByIdAndConsumeThem(
ResourceIdList theIds,
RequestPartitionId theRequestPartitionId,
Consumer<List<IBaseResource>> theResourceListConsumer) {
ArrayListMultimap<String, String> typeToIds = ArrayListMultimap.create();
theIds.getIds().forEach(t -> typeToIds.put(t.getResourceType(), t.getId()));
List<IBaseResource> resources = new ArrayList<>(theIds.getIds().size());
for (String resourceType : typeToIds.keySet()) {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType);
@ -383,31 +203,9 @@ public class ExpandResourceAndWriteBinaryStep
SearchParameterMap spMap = SearchParameterMap.newSynchronous().add(PARAM_ID, idListParam);
IBundleProvider outcome =
dao.search(spMap, new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId));
resources.addAll(outcome.getAllResources());
theResourceListConsumer.accept(outcome.getAllResources());
}
}
return resources;
}
private ListMultimap<String, String> encodeToString(
List<IBaseResource> theResources, BulkExportJobParameters theParameters) {
IParser parser = getParser(theParameters);
ListMultimap<String, String> retVal = ArrayListMultimap.create();
for (IBaseResource resource : theResources) {
String type = myFhirContext.getResourceType(resource);
String jsonResource = parser.encodeResourceToString(resource);
retVal.put(type, jsonResource);
}
return retVal;
}
private IParser getParser(BulkExportJobParameters theParameters) {
// The parser depends on the output format
// but for now, only ndjson is supported
// see WriteBinaryStep as well
return myFhirContext.newJsonParser().setPrettyPrint(false);
}
/**
@ -462,4 +260,310 @@ public class ExpandResourceAndWriteBinaryStep
public void setIdHelperServiceForUnitTest(IIdHelperService theIdHelperService) {
myIdHelperService = theIdHelperService;
}
/**
* This class takes a collection of lists of resources read from the
* repository, and processes them, then converts them into
* {@link ExpandedResourcesList} instances, each one of which corresponds
* to a single output file. We try to avoid exceeding the maximum file
* size defined in
* {@link JpaStorageSettings#getBulkExportFileMaximumSize()}
* so we will do our best to emit multiple lists in favour of emitting
* a list that exceeds that threshold.
*/
private class ExpandResourcesConsumer implements Consumer<List<IBaseResource>> {
private final Consumer<ExpandedResourcesList> myResourceWriter;
private final StepExecutionDetails<BulkExportJobParameters, ResourceIdList> myStepExecutionDetails;
public ExpandResourcesConsumer(
StepExecutionDetails<BulkExportJobParameters, ResourceIdList> theStepExecutionDetails,
Consumer<ExpandedResourcesList> theResourceWriter) {
myStepExecutionDetails = theStepExecutionDetails;
myResourceWriter = theResourceWriter;
}
@Override
public void accept(List<IBaseResource> theResources) throws JobExecutionFailedException {
String instanceId = myStepExecutionDetails.getInstance().getInstanceId();
String chunkId = myStepExecutionDetails.getChunkId();
ResourceIdList idList = myStepExecutionDetails.getData();
BulkExportJobParameters parameters = myStepExecutionDetails.getParameters();
ourLog.info(
"Bulk export instance[{}] chunk[{}] - About to expand {} resource IDs into their full resource bodies.",
instanceId,
chunkId,
idList.getIds().size());
// Apply post-fetch filtering
String resourceType = idList.getResourceType();
List<String> postFetchFilterUrls = parameters.getPostFetchFilterUrls().stream()
.filter(t -> t.substring(0, t.indexOf('?')).equals(resourceType))
.collect(Collectors.toList());
if (!postFetchFilterUrls.isEmpty()) {
applyPostFetchFiltering(theResources, postFetchFilterUrls, instanceId, chunkId);
}
// if necessary, expand resources
if (parameters.isExpandMdm()) {
myBulkExportProcessor.expandMdmResources(theResources);
}
// Normalize terminology
if (myStorageSettings.isNormalizeTerminologyForBulkExportJobs()) {
ResponseTerminologyTranslationSvc terminologyTranslationSvc = myResponseTerminologyTranslationSvc;
if (terminologyTranslationSvc == null) {
terminologyTranslationSvc = myApplicationContext.getBean(ResponseTerminologyTranslationSvc.class);
myResponseTerminologyTranslationSvc = terminologyTranslationSvc;
}
terminologyTranslationSvc.processResourcesForTerminologyTranslation(theResources);
}
// Interceptor call
if (myInterceptorService.hasHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION)) {
for (Iterator<IBaseResource> iter = theResources.iterator(); iter.hasNext(); ) {
HookParams params = new HookParams()
.add(BulkExportJobParameters.class, myStepExecutionDetails.getParameters())
.add(IBaseResource.class, iter.next());
boolean outcome =
myInterceptorService.callHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION, params);
if (!outcome) {
iter.remove();
}
}
}
// encode them - Key is resource type, Value is a collection of serialized resources of that type
IParser parser = getParser(parameters);
ListMultimap<String, String> resourceTypeToStringifiedResources = ArrayListMultimap.create();
Map<String, Integer> resourceTypeToTotalSize = new HashMap<>();
for (IBaseResource resource : theResources) {
String type = myFhirContext.getResourceType(resource);
int existingSize = resourceTypeToTotalSize.getOrDefault(type, 0);
String jsonResource = parser.encodeResourceToString(resource);
int newSize = existingSize + jsonResource.length();
// If adding another stringified resource to the list for the given type
// would exceed the configured maximum allowed, then let's send the current
// list and flush it. Note that if a single resource exceeds the configurable
// maximum then we have no choice but to send it
long bulkExportFileMaximumSize = myStorageSettings.getBulkExportFileMaximumSize();
if (newSize > bulkExportFileMaximumSize) {
if (existingSize == 0) {
// If no files are already in the collection, then this one file
// is bigger than the maximum allowable. We'll allow it in that
// case
ourLog.warn(
"Single resource size {} exceeds allowable maximum of {}, so will ignore maximum",
newSize,
bulkExportFileMaximumSize);
} else {
// Otherwise, flush the contents now before adding the next file
List<String> stringifiedResources = resourceTypeToStringifiedResources.get(type);
writeStringifiedResources(type, stringifiedResources);
resourceTypeToStringifiedResources.removeAll(type);
newSize = jsonResource.length();
}
}
resourceTypeToStringifiedResources.put(type, jsonResource);
resourceTypeToTotalSize.put(type, newSize);
}
for (String nextResourceType : resourceTypeToStringifiedResources.keySet()) {
List<String> stringifiedResources = resourceTypeToStringifiedResources.get(nextResourceType);
writeStringifiedResources(nextResourceType, stringifiedResources);
}
}
private void writeStringifiedResources(String theResourceType, List<String> theStringifiedResources) {
if (!theStringifiedResources.isEmpty()) {
ExpandedResourcesList output = new ExpandedResourcesList();
output.setStringifiedResources(theStringifiedResources);
output.setResourceType(theResourceType);
myResourceWriter.accept(output);
ourLog.info(
"Expanding of {} resources of type {} completed",
theStringifiedResources.size(),
theResourceType);
}
}
private void applyPostFetchFiltering(
List<IBaseResource> theResources,
List<String> thePostFetchFilterUrls,
String theInstanceId,
String theChunkId) {
int numRemoved = 0;
for (Iterator<IBaseResource> iter = theResources.iterator(); iter.hasNext(); ) {
boolean matched = applyPostFetchFilteringForSingleResource(thePostFetchFilterUrls, iter);
if (!matched) {
iter.remove();
numRemoved++;
}
}
if (numRemoved > 0) {
ourLog.info(
"Bulk export instance[{}] chunk[{}] - {} resources were filtered out because of post-fetch filter URLs",
theInstanceId,
theChunkId,
numRemoved);
}
}
private boolean applyPostFetchFilteringForSingleResource(
List<String> thePostFetchFilterUrls, Iterator<IBaseResource> iter) {
IBaseResource nextResource = iter.next();
String nextResourceType = myFhirContext.getResourceType(nextResource);
for (String nextPostFetchFilterUrl : thePostFetchFilterUrls) {
if (nextPostFetchFilterUrl.contains("?")) {
String resourceType = nextPostFetchFilterUrl.substring(0, nextPostFetchFilterUrl.indexOf('?'));
if (nextResourceType.equals(resourceType)) {
InMemoryMatchResult matchResult = myInMemoryResourceMatcher.match(
nextPostFetchFilterUrl, nextResource, null, new SystemRequestDetails());
if (matchResult.matched()) {
return true;
}
}
}
}
return false;
}
private IParser getParser(BulkExportJobParameters theParameters) {
// The parser depends on the output format
// but for now, only ndjson is supported
// see WriteBinaryStep as well
return myFhirContext.newJsonParser().setPrettyPrint(false);
}
}
/**
* This class takes a collection of expanded resources, and expands it to
* an NDJSON file, which is written to a Binary resource.
*/
private class NdJsonResourceWriter implements Consumer<ExpandedResourcesList> {
private final StepExecutionDetails<BulkExportJobParameters, ResourceIdList> myStepExecutionDetails;
private final IJobDataSink<BulkExportBinaryFileId> myDataSink;
private int myNumResourcesProcessed = 0;
public NdJsonResourceWriter(
StepExecutionDetails<BulkExportJobParameters, ResourceIdList> theStepExecutionDetails,
IJobDataSink<BulkExportBinaryFileId> theDataSink) {
this.myStepExecutionDetails = theStepExecutionDetails;
this.myDataSink = theDataSink;
}
public int getNumResourcesProcessed() {
return myNumResourcesProcessed;
}
@Override
public void accept(ExpandedResourcesList theExpandedResourcesList) throws JobExecutionFailedException {
int batchSize = theExpandedResourcesList.getStringifiedResources().size();
ourLog.info("Writing {} resources to binary file", batchSize);
myNumResourcesProcessed += batchSize;
@SuppressWarnings("unchecked")
IFhirResourceDao<IBaseBinary> binaryDao = myDaoRegistry.getResourceDao("Binary");
IBaseBinary binary = BinaryUtil.newBinary(myFhirContext);
addMetadataExtensionsToBinary(myStepExecutionDetails, theExpandedResourcesList, binary);
binary.setContentType(Constants.CT_FHIR_NDJSON);
int processedRecordsCount = 0;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
try (OutputStreamWriter streamWriter = getStreamWriter(outputStream)) {
for (String stringified : theExpandedResourcesList.getStringifiedResources()) {
streamWriter.append(stringified);
streamWriter.append("\n");
processedRecordsCount++;
}
streamWriter.flush();
outputStream.flush();
}
binary.setContent(outputStream.toByteArray());
} catch (IOException ex) {
String errorMsg = String.format(
"Failure to process resource of type %s : %s",
theExpandedResourcesList.getResourceType(), ex.getMessage());
ourLog.error(errorMsg);
throw new JobExecutionFailedException(Msg.code(2431) + errorMsg);
}
SystemRequestDetails srd = new SystemRequestDetails();
BulkExportJobParameters jobParameters = myStepExecutionDetails.getParameters();
RequestPartitionId partitionId = jobParameters.getPartitionId();
if (partitionId == null) {
srd.setRequestPartitionId(RequestPartitionId.defaultPartition());
} else {
srd.setRequestPartitionId(partitionId);
}
// Pick a unique ID and retry until we get one that isn't already used. This is just to
// avoid any possibility of people guessing the IDs of these Binaries and fishing for them.
while (true) {
// Use a random ID to make it harder to guess IDs - 32 characters of a-zA-Z0-9
// has 190 bts of entropy according to https://www.omnicalculator.com/other/password-entropy
String proposedId = RandomTextUtils.newSecureRandomAlphaNumericString(32);
binary.setId(proposedId);
// Make sure we don't accidentally reuse an ID. This should be impossible given the
// amount of entropy in the IDs but might as well be sure.
try {
IBaseBinary output = binaryDao.read(binary.getIdElement(), new SystemRequestDetails(), true);
if (output != null) {
continue;
}
} catch (ResourceNotFoundException e) {
// good
}
break;
}
if (myFhirContext.getVersion().getVersion().isNewerThan(FhirVersionEnum.DSTU2)) {
if (isNotBlank(jobParameters.getBinarySecurityContextIdentifierSystem())
|| isNotBlank(jobParameters.getBinarySecurityContextIdentifierValue())) {
FhirTerser terser = myFhirContext.newTerser();
terser.setElement(
binary,
"securityContext.identifier.system",
jobParameters.getBinarySecurityContextIdentifierSystem());
terser.setElement(
binary,
"securityContext.identifier.value",
jobParameters.getBinarySecurityContextIdentifierValue());
}
}
DaoMethodOutcome outcome = binaryDao.update(binary, srd);
IIdType id = outcome.getId();
BulkExportBinaryFileId bulkExportBinaryFileId = new BulkExportBinaryFileId();
bulkExportBinaryFileId.setBinaryId(id.getValueAsString());
bulkExportBinaryFileId.setResourceType(theExpandedResourcesList.getResourceType());
myDataSink.accept(bulkExportBinaryFileId);
ourLog.info(
"Binary writing complete for {} resources of type {}.",
processedRecordsCount,
theExpandedResourcesList.getResourceType());
}
}
}

View File

@ -26,18 +26,19 @@ import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.jobs.models.BatchResourceId;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.PersistentIdToForcedIdMap;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryResourceMatcher;
@ -52,6 +53,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import jakarta.annotation.Nonnull;
import org.apache.commons.collections4.ListUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
@ -84,7 +86,7 @@ public class ExpandResourcesStep
private ApplicationContext myApplicationContext;
@Autowired
private StorageSettings myStorageSettings;
private JpaStorageSettings myStorageSettings;
@Autowired
private IIdHelperService myIdHelperService;
@ -108,72 +110,99 @@ public class ExpandResourcesStep
throws JobExecutionFailedException {
String instanceId = theStepExecutionDetails.getInstance().getInstanceId();
String chunkId = theStepExecutionDetails.getChunkId();
ResourceIdList idList = theStepExecutionDetails.getData();
ResourceIdList data = theStepExecutionDetails.getData();
BulkExportJobParameters parameters = theStepExecutionDetails.getParameters();
ourLog.info(
"Bulk export instance[{}] chunk[{}] - About to expand {} resource IDs into their full resource bodies.",
instanceId,
chunkId,
idList.getIds().size());
data.getIds().size());
// search the resources
List<IBaseResource> allResources = fetchAllResources(idList, parameters.getPartitionId());
// Partition the ID list in order to only fetch a reasonable number at a time
List<List<BatchResourceId>> idLists = ListUtils.partition(data.getIds(), 100);
// Apply post-fetch filtering
String resourceType = idList.getResourceType();
List<String> postFetchFilterUrls = parameters.getPostFetchFilterUrls().stream()
.filter(t -> t.substring(0, t.indexOf('?')).equals(resourceType))
.collect(Collectors.toList());
for (List<BatchResourceId> idList : idLists) {
if (!postFetchFilterUrls.isEmpty()) {
applyPostFetchFiltering(allResources, postFetchFilterUrls, instanceId, chunkId);
}
// search the resources
List<IBaseResource> allResources = fetchAllResources(idList, parameters.getPartitionId());
// if necessary, expand resources
if (parameters.isExpandMdm()) {
myBulkExportProcessor.expandMdmResources(allResources);
}
// Apply post-fetch filtering
String resourceType = data.getResourceType();
List<String> postFetchFilterUrls = parameters.getPostFetchFilterUrls().stream()
.filter(t -> t.substring(0, t.indexOf('?')).equals(resourceType))
.collect(Collectors.toList());
// Normalize terminology
if (myStorageSettings.isNormalizeTerminologyForBulkExportJobs()) {
ResponseTerminologyTranslationSvc terminologyTranslationSvc = myResponseTerminologyTranslationSvc;
if (terminologyTranslationSvc == null) {
terminologyTranslationSvc = myApplicationContext.getBean(ResponseTerminologyTranslationSvc.class);
myResponseTerminologyTranslationSvc = terminologyTranslationSvc;
if (!postFetchFilterUrls.isEmpty()) {
applyPostFetchFiltering(allResources, postFetchFilterUrls, instanceId, chunkId);
}
terminologyTranslationSvc.processResourcesForTerminologyTranslation(allResources);
}
// Interceptor call
if (myInterceptorService.hasHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION)) {
for (Iterator<IBaseResource> iter = allResources.iterator(); iter.hasNext(); ) {
HookParams params = new HookParams()
.add(BulkExportJobParameters.class, theStepExecutionDetails.getParameters())
.add(IBaseResource.class, iter.next());
boolean outcome =
myInterceptorService.callHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION, params);
if (!outcome) {
iter.remove();
// if necessary, expand resources
if (parameters.isExpandMdm()) {
myBulkExportProcessor.expandMdmResources(allResources);
}
// Normalize terminology
if (myStorageSettings.isNormalizeTerminologyForBulkExportJobs()) {
ResponseTerminologyTranslationSvc terminologyTranslationSvc = myResponseTerminologyTranslationSvc;
if (terminologyTranslationSvc == null) {
terminologyTranslationSvc = myApplicationContext.getBean(ResponseTerminologyTranslationSvc.class);
myResponseTerminologyTranslationSvc = terminologyTranslationSvc;
}
terminologyTranslationSvc.processResourcesForTerminologyTranslation(allResources);
}
// Interceptor call
if (myInterceptorService.hasHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION)) {
for (Iterator<IBaseResource> iter = allResources.iterator(); iter.hasNext(); ) {
HookParams params = new HookParams()
.add(BulkExportJobParameters.class, theStepExecutionDetails.getParameters())
.add(IBaseResource.class, iter.next());
boolean outcome =
myInterceptorService.callHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION, params);
if (!outcome) {
iter.remove();
}
}
}
}
// encode them - Key is resource type, Value is a collection of serialized resources of that type
ListMultimap<String, String> resources = encodeToString(allResources, parameters);
// encode them - Key is resource type, Value is a collection of serialized resources of that type
ListMultimap<String, String> resources = encodeToString(allResources, parameters);
// set to datasink
for (String nextResourceType : resources.keySet()) {
// send to datasink
long maxFileSize = myStorageSettings.getBulkExportFileMaximumSize();
long currentFileSize = 0;
for (String nextResourceType : resources.keySet()) {
ExpandedResourcesList output = new ExpandedResourcesList();
output.setStringifiedResources(resources.get(nextResourceType));
output.setResourceType(nextResourceType);
theDataSink.accept(output);
List<String> stringifiedResources = resources.get(nextResourceType);
List<String> currentFileStringifiedResources = new ArrayList<>();
ourLog.info(
"Expanding of {} resources of type {} completed",
idList.getIds().size(),
idList.getResourceType());
for (String nextStringifiedResource : stringifiedResources) {
if (currentFileSize + nextStringifiedResource.length() > maxFileSize
&& !currentFileStringifiedResources.isEmpty()) {
ExpandedResourcesList output = new ExpandedResourcesList();
output.setStringifiedResources(currentFileStringifiedResources);
output.setResourceType(nextResourceType);
theDataSink.accept(output);
currentFileStringifiedResources = new ArrayList<>();
currentFileSize = 0;
}
currentFileStringifiedResources.add(nextStringifiedResource);
currentFileSize += nextStringifiedResource.length();
}
if (!currentFileStringifiedResources.isEmpty()) {
ExpandedResourcesList output = new ExpandedResourcesList();
output.setStringifiedResources(currentFileStringifiedResources);
output.setResourceType(nextResourceType);
theDataSink.accept(output);
}
ourLog.info("Expanding of {} resources of type {} completed", idList.size(), data.getResourceType());
}
}
// and return
@ -224,42 +253,36 @@ public class ExpandResourcesStep
return false;
}
private List<IBaseResource> fetchAllResources(ResourceIdList theIds, RequestPartitionId theRequestPartitionId) {
private List<IBaseResource> fetchAllResources(
List<BatchResourceId> theIds, RequestPartitionId theRequestPartitionId) {
ArrayListMultimap<String, String> typeToIds = ArrayListMultimap.create();
theIds.getIds().forEach(t -> typeToIds.put(t.getResourceType(), t.getId()));
theIds.forEach(t -> typeToIds.put(t.getResourceType(), t.getId()));
List<IBaseResource> resources = new ArrayList<>(theIds.getIds().size());
List<IBaseResource> resources = new ArrayList<>(theIds.size());
for (String resourceType : typeToIds.keySet()) {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType);
List<String> allIds = typeToIds.get(resourceType);
while (!allIds.isEmpty()) {
// Load in batches in order to avoid having too many PIDs go into a
// single SQ statement at once
int batchSize = Math.min(500, allIds.size());
Set<IResourcePersistentId> nextBatchOfPids = allIds.stream()
.map(t -> myIdHelperService.newPidFromStringIdAndResourceName(t, resourceType))
.collect(Collectors.toSet());
Set<IResourcePersistentId> nextBatchOfPids = allIds.subList(0, batchSize).stream()
.map(t -> myIdHelperService.newPidFromStringIdAndResourceName(t, resourceType))
.collect(Collectors.toSet());
allIds = allIds.subList(batchSize, allIds.size());
PersistentIdToForcedIdMap nextBatchOfResourceIds = myTransactionService
.withRequest(null)
.execute(() -> myIdHelperService.translatePidsToForcedIds(nextBatchOfPids));
PersistentIdToForcedIdMap nextBatchOfResourceIds = myTransactionService
.withRequest(null)
.execute(() -> myIdHelperService.translatePidsToForcedIds(nextBatchOfPids));
TokenOrListParam idListParam = new TokenOrListParam();
for (IResourcePersistentId nextPid : nextBatchOfPids) {
Optional<String> resourceId = nextBatchOfResourceIds.get(nextPid);
idListParam.add(resourceId.orElse(nextPid.getId().toString()));
}
SearchParameterMap spMap = SearchParameterMap.newSynchronous().add(PARAM_ID, idListParam);
IBundleProvider outcome =
dao.search(spMap, new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId));
resources.addAll(outcome.getAllResources());
TokenOrListParam idListParam = new TokenOrListParam();
for (IResourcePersistentId nextPid : nextBatchOfPids) {
Optional<String> resourceId = nextBatchOfResourceIds.get(nextPid);
idListParam.add(resourceId.orElse(nextPid.getId().toString()));
}
SearchParameterMap spMap = SearchParameterMap.newSynchronous().add(PARAM_ID, idListParam);
IBundleProvider outcome =
dao.search(spMap, new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId));
resources.addAll(outcome.getAllResources());
}
return resources;

View File

@ -102,6 +102,8 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
myBulkExportProcessor.getResourcePidIterator(providerParams);
List<BatchResourceId> idsToSubmit = new ArrayList<>();
int estimatedChunkSize = 0;
if (!pidIterator.hasNext()) {
ourLog.debug("Bulk Export generated an iterator with no results!");
}
@ -121,17 +123,25 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
idsToSubmit.add(batchResourceId);
if (estimatedChunkSize > 0) {
// Account for comma between array entries
estimatedChunkSize++;
}
estimatedChunkSize += batchResourceId.estimateSerializedSize();
// Make sure resources stored in each batch does not go over the max capacity
if (idsToSubmit.size() >= myStorageSettings.getBulkExportFileMaximumCapacity()) {
submitWorkChunk(idsToSubmit, resourceType, params, theDataSink);
if (idsToSubmit.size() >= myStorageSettings.getBulkExportFileMaximumCapacity()
|| estimatedChunkSize >= myStorageSettings.getBulkExportFileMaximumSize()) {
submitWorkChunk(idsToSubmit, resourceType, theDataSink);
submissionCount++;
idsToSubmit = new ArrayList<>();
estimatedChunkSize = 0;
}
}
// if we have any other Ids left, submit them now
if (!idsToSubmit.isEmpty()) {
submitWorkChunk(idsToSubmit, resourceType, params, theDataSink);
submitWorkChunk(idsToSubmit, resourceType, theDataSink);
submissionCount++;
}
}
@ -150,7 +160,6 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
private void submitWorkChunk(
List<BatchResourceId> theBatchResourceIds,
String theResourceType,
BulkExportJobParameters theParams,
IJobDataSink<ResourceIdList> theDataSink) {
ResourceIdList idList = new ResourceIdList();

View File

@ -22,10 +22,13 @@ package ca.uhn.fhir.batch2.jobs.models;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
public class BatchResourceId implements IModelJson {
import static org.apache.commons.lang3.StringUtils.defaultString;
public class BatchResourceId implements IModelJson, Comparable<BatchResourceId> {
@JsonProperty("type")
private String myResourceType;
@ -77,6 +80,24 @@ public class BatchResourceId implements IModelJson {
return new HashCodeBuilder(17, 37).append(myResourceType).append(myId).toHashCode();
}
/**
* Returns an estimate of how long the JSON serialized (non-pretty printed) form
* of this object will be.
*/
public int estimateSerializedSize() {
// 19 chars: {"id":"","type":""}
return 19 + defaultString(myId).length() + defaultString(myResourceType).length();
}
@Override
public int compareTo(@Nonnull BatchResourceId o) {
int retVal = o.myResourceType.compareTo(myResourceType);
if (retVal == 0) {
retVal = o.myId.compareTo(myId);
}
return retVal;
}
public static BatchResourceId getIdFromPID(IResourcePersistentId thePID, String theResourceType) {
BatchResourceId batchResourceId = new BatchResourceId();
batchResourceId.setId(thePID.getId().toString());

View File

@ -12,6 +12,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
@ -112,7 +113,7 @@ public class ExpandResourceAndWriteBinaryStepTest {
private FhirContext myFhirContext = FhirContext.forR4Cached();
@Spy
private StorageSettings myStorageSettings = new StorageSettings();
private JpaStorageSettings myStorageSettings = new JpaStorageSettings();
@Spy
private IHapiTransactionService myTransactionService = new NonTransactionalHapiTransactionService();

View File

@ -4,14 +4,14 @@ package ca.uhn.fhir.batch2.jobs.export;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters;
import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.jobs.models.BatchResourceId;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.PersistentIdToForcedIdMap;
@ -20,8 +20,8 @@ import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.interceptor.ResponseTerminologyTranslationSvc;
@ -75,7 +75,7 @@ public class ExpandResourcesStepTest {
private FhirContext myFhirContext = FhirContext.forR4Cached();
@Spy
private StorageSettings myStorageSettings = new StorageSettings();
private JpaStorageSettings myStorageSettings = new JpaStorageSettings();
@Spy
private IHapiTransactionService myTransactionService = new NonTransactionalHapiTransactionService();

View File

@ -130,6 +130,7 @@ public class FetchResourceIdsStepTest {
.thenReturn(observationIds.iterator());
int maxFileCapacity = 1000;
when(myStorageSettings.getBulkExportFileMaximumCapacity()).thenReturn(maxFileCapacity);
when(myStorageSettings.getBulkExportFileMaximumSize()).thenReturn(10000L);
// test
RunOutcome outcome = myFirstStep.run(input, sink);
@ -191,6 +192,7 @@ public class FetchResourceIdsStepTest {
// when
int maxFileCapacity = 5;
when(myStorageSettings.getBulkExportFileMaximumCapacity()).thenReturn(maxFileCapacity);
when(myStorageSettings.getBulkExportFileMaximumSize()).thenReturn(10000L);
for (int i = 0; i <= maxFileCapacity; i++) {
JpaPid id = JpaPid.fromId((long) i);

View File

@ -49,6 +49,10 @@ import java.util.TreeSet;
@SuppressWarnings("JavadocLinkAsPlainText")
public class JpaStorageSettings extends StorageSettings {
/**
* Default value for {@link #getBulkExportFileMaximumSize()}: 100 MB
*/
public static final long DEFAULT_BULK_EXPORT_MAXIMUM_WORK_CHUNK_SIZE = 100 * FileUtils.ONE_MB;
/**
* Default value for {@link #setReuseCachedSearchResultsForMillis(Long)}: 60000ms (one minute)
*/
@ -313,6 +317,10 @@ public class JpaStorageSettings extends StorageSettings {
* Since 6.2.0
*/
private int myBulkExportFileMaximumCapacity = DEFAULT_BULK_EXPORT_FILE_MAXIMUM_CAPACITY;
/**
* Since 7.2.0
*/
private long myBulkExportFileMaximumSize = DEFAULT_BULK_EXPORT_MAXIMUM_WORK_CHUNK_SIZE;
/**
* Since 6.4.0
*/
@ -2301,11 +2309,42 @@ public class JpaStorageSettings extends StorageSettings {
* Default is 1000 resources per file.
*
* @since 6.2.0
* @see #setBulkExportFileMaximumCapacity(int)
*/
public void setBulkExportFileMaximumCapacity(int theBulkExportFileMaximumCapacity) {
myBulkExportFileMaximumCapacity = theBulkExportFileMaximumCapacity;
}
/**
* Defines the maximum size for a single work chunk or report file to be held in
* memory or stored in the database for bulk export jobs.
* Note that the framework will attempt to not exceed this limit, but will only
* estimate the actual chunk size as it works, so this value should be set
* below any hard limits that may be present.
*
* @since 7.2.0
* @see #DEFAULT_BULK_EXPORT_MAXIMUM_WORK_CHUNK_SIZE The default value for this setting
*/
public long getBulkExportFileMaximumSize() {
return myBulkExportFileMaximumSize;
}
/**
* Defines the maximum size for a single work chunk or report file to be held in
* memory or stored in the database for bulk export jobs. Default is 100 MB.
* Note that the framework will attempt to not exceed this limit, but will only
* estimate the actual chunk size as it works, so this value should be set
* below any hard limits that may be present.
*
* @since 7.2.0
* @see #setBulkExportFileMaximumCapacity(int)
* @see #DEFAULT_BULK_EXPORT_MAXIMUM_WORK_CHUNK_SIZE The default value for this setting
*/
public void setBulkExportFileMaximumSize(long theBulkExportFileMaximumSize) {
Validate.isTrue(theBulkExportFileMaximumSize > 0, "theBulkExportFileMaximumSize must be positive");
myBulkExportFileMaximumSize = theBulkExportFileMaximumSize;
}
/**
* If this setting is enabled, then gated batch jobs that produce only one chunk will immediately trigger a batch
* maintenance job. This may be useful for testing, but is not recommended for production use.