diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5800-enforce-maximum-bulk-export-file-size.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5800-enforce-maximum-bulk-export-file-size.yaml new file mode 100644 index 00000000000..46b2c510e34 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5800-enforce-maximum-bulk-export-file-size.yaml @@ -0,0 +1,6 @@ +--- +type: add +issue: 5800 +title: "A new setting in JpaStorageSettings enforces a maximum file size for Bulk Export + output files, as well as work chunks creating during processing. This setting has + a default value of 100 MB." diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java index dad2eba02bd..1d0f89c493f 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataExportTest.java @@ -1,6 +1,7 @@ package ca.uhn.fhir.jpa.bulk; import ca.uhn.fhir.batch2.api.IJobCoordinator; +import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.batch2.model.StatusEnum; @@ -10,6 +11,9 @@ import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.model.BulkExportJobResults; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; +import ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl; +import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; +import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test; import ca.uhn.fhir.rest.api.Constants; @@ -21,11 +25,13 @@ import ca.uhn.fhir.rest.client.apache.ResourceEntity; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.provider.ProviderConstants; import ca.uhn.fhir.test.utilities.HttpClientExtension; +import ca.uhn.fhir.test.utilities.ProxyUtil; import ca.uhn.fhir.util.Batch2JobDefinitionConstants; import ca.uhn.fhir.util.JsonUtil; import com.google.common.collect.Sets; import jakarta.annotation.Nonnull; import org.apache.commons.io.LineIterator; +import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; @@ -66,6 +72,7 @@ import org.mockito.Spy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.PageRequest; import java.io.IOException; import java.io.StringReader; @@ -80,6 +87,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import static ca.uhn.fhir.batch2.jobs.export.BulkExportAppCtx.CREATE_REPORT_STEP; +import static ca.uhn.fhir.batch2.jobs.export.BulkExportAppCtx.WRITE_TO_BINARIES; import static ca.uhn.fhir.jpa.dao.r4.FhirResourceDaoR4TagsInlineTest.createSearchParameterForInlineSecurity; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.awaitility.Awaitility.await; @@ -100,17 +109,25 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test { @Autowired private IJobCoordinator myJobCoordinator; + @Autowired + private IBatch2WorkChunkRepository myWorkChunkRepository; + @Autowired + private IJobPersistence myJobPersistence; + private JpaJobPersistenceImpl myJobPersistenceImpl; @AfterEach void afterEach() { myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.DISABLED); - myStorageSettings.setTagStorageMode(new JpaStorageSettings().getTagStorageMode()); - myStorageSettings.setResourceClientIdStrategy(new JpaStorageSettings().getResourceClientIdStrategy()); + JpaStorageSettings defaults = new JpaStorageSettings(); + myStorageSettings.setTagStorageMode(defaults.getTagStorageMode()); + myStorageSettings.setResourceClientIdStrategy(defaults.getResourceClientIdStrategy()); + myStorageSettings.setBulkExportFileMaximumSize(defaults.getBulkExportFileMaximumSize()); } @BeforeEach public void beforeEach() { myStorageSettings.setJobFastTrackingEnabled(false); + myJobPersistenceImpl = ProxyUtil.getSingletonTarget(myJobPersistence, JpaJobPersistenceImpl.class); } @Spy diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/export/ExpandResourcesAndWriteBinaryStepJpaTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/export/ExpandResourcesAndWriteBinaryStepJpaTest.java new file mode 100644 index 00000000000..3df3cc65040 --- /dev/null +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/export/ExpandResourcesAndWriteBinaryStepJpaTest.java @@ -0,0 +1,177 @@ +package ca.uhn.fhir.jpa.bulk.export; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.jobs.export.ExpandResourceAndWriteBinaryStep; +import ca.uhn.fhir.batch2.jobs.export.ExpandResourcesStep; +import ca.uhn.fhir.batch2.jobs.export.models.BulkExportBinaryFileId; +import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList; +import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList; +import ca.uhn.fhir.batch2.jobs.models.BatchResourceId; +import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; +import ca.uhn.fhir.jpa.test.BaseJpaR4Test; +import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; +import jakarta.persistence.Id; +import org.apache.commons.lang3.StringUtils; +import org.hl7.fhir.r4.model.Binary; +import org.hl7.fhir.r4.model.IdType; +import org.hl7.fhir.r4.model.Patient; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.springframework.beans.factory.annotation.Autowired; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class ExpandResourcesAndWriteBinaryStepJpaTest extends BaseJpaR4Test { + + @Autowired + private ExpandResourceAndWriteBinaryStep myExpandResourcesStep; + + @Mock + private IJobDataSink mySink; + @Captor + private ArgumentCaptor myWorkChunkCaptor; + + @Override + public void afterCleanupDao() { + super.afterCleanupDao(); + + JpaStorageSettings defaults = new JpaStorageSettings(); + myStorageSettings.setBulkExportFileMaximumSize(defaults.getBulkExportFileMaximumSize()); + } + + @Test + public void testMaximumChunkSize() { + /* + * We're going to set the maximum file size to 3000, and create some resources with + * a name that is 1000 chars long. With the other boilerplate text in a resource that + * will put the resource length at just over 1000 chars, meaning that any given + * chunk or file should have only 2 resources in it. + */ + int testResourceSize = 1000; + int maxFileSize = 3 * testResourceSize; + myStorageSettings.setBulkExportFileMaximumSize(maxFileSize); + + List expectedIds = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Patient p = new Patient(); + p.addName().setFamily(StringUtils.leftPad("", testResourceSize, 'A')); + String id = myPatientDao.create(p, mySrd).getId().getIdPart(); + expectedIds.add(new BatchResourceId().setResourceType("Patient").setId(id)); + } + Collections.sort(expectedIds); + + ResourceIdList resourceList = new ResourceIdList(); + resourceList.setResourceType("Patient"); + resourceList.setIds(expectedIds); + + BulkExportJobParameters params = new BulkExportJobParameters(); + JobInstance jobInstance = new JobInstance(); + String chunkId = "ABC"; + + StepExecutionDetails details = new StepExecutionDetails<>(params, resourceList, jobInstance, chunkId); + + // Test + + myExpandResourcesStep.run(details, mySink); + + // Verify + verify(mySink, atLeast(1)).accept(myWorkChunkCaptor.capture()); + List actualResourceIdList = new ArrayList<>(); + for (BulkExportBinaryFileId next : myWorkChunkCaptor.getAllValues()) { + + Binary nextBinary = myBinaryDao.read(new IdType(next.getBinaryId()), mySrd); + String nextNdJsonString = new String(nextBinary.getContent(), StandardCharsets.UTF_8); + + // This is the most important check here + assertThat(nextNdJsonString.length(), lessThanOrEqualTo(maxFileSize)); + + Arrays.stream(nextNdJsonString.split("\\n")) + .filter(StringUtils::isNotBlank) + .map(t->myFhirContext.newJsonParser().parseResource(t)) + .map(t->new BatchResourceId().setResourceType(t.getIdElement().getResourceType()).setId(t.getIdElement().getIdPart())) + .forEach(actualResourceIdList::add); + + } + Collections.sort(actualResourceIdList); + assertEquals(expectedIds, actualResourceIdList); + } + + @Test + public void testMaximumChunkSize_SingleFileExceedsMaximum() { + /* + * We're going to set the maximum file size to 1000, and create some resources + * with a name that is 1500 chars long. In this case, we'll exceed the + * configured maximum, so it should be one output file per resourcs. + */ + int testResourceSize = 1500; + int maxFileSize = 1000; + myStorageSettings.setBulkExportFileMaximumSize(maxFileSize); + + List expectedIds = new ArrayList<>(); + int numberOfResources = 10; + for (int i = 0; i < numberOfResources; i++) { + Patient p = new Patient(); + p.addName().setFamily(StringUtils.leftPad("", testResourceSize, 'A')); + String id = myPatientDao.create(p, mySrd).getId().getIdPart(); + expectedIds.add(new BatchResourceId().setResourceType("Patient").setId(id)); + } + Collections.sort(expectedIds); + + ResourceIdList resourceList = new ResourceIdList(); + resourceList.setResourceType("Patient"); + resourceList.setIds(expectedIds); + + BulkExportJobParameters params = new BulkExportJobParameters(); + JobInstance jobInstance = new JobInstance(); + String chunkId = "ABC"; + + StepExecutionDetails details = new StepExecutionDetails<>(params, resourceList, jobInstance, chunkId); + + // Test + + myExpandResourcesStep.run(details, mySink); + + // Verify + + // This is the most important check - we should have one file per resource + verify(mySink, times(numberOfResources)).accept(myWorkChunkCaptor.capture()); + + List actualResourceIdList = new ArrayList<>(); + for (BulkExportBinaryFileId next : myWorkChunkCaptor.getAllValues()) { + + Binary nextBinary = myBinaryDao.read(new IdType(next.getBinaryId()), mySrd); + String nextNdJsonString = new String(nextBinary.getContent(), StandardCharsets.UTF_8); + + Arrays.stream(nextNdJsonString.split("\\n")) + .filter(StringUtils::isNotBlank) + .map(t->myFhirContext.newJsonParser().parseResource(t)) + .map(t->new BatchResourceId().setResourceType(t.getIdElement().getResourceType()).setId(t.getIdElement().getIdPart())) + .forEach(actualResourceIdList::add); + + } + Collections.sort(actualResourceIdList); + assertEquals(expectedIds, actualResourceIdList); + } + +} diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/export/ExpandResourcesStepJpaTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/export/ExpandResourcesStepJpaTest.java index 2b4257efe84..b77cd82fbbc 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/export/ExpandResourcesStepJpaTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/export/ExpandResourcesStepJpaTest.java @@ -3,13 +3,14 @@ package ca.uhn.fhir.jpa.bulk.export; import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.jobs.export.ExpandResourcesStep; -import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList; import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList; import ca.uhn.fhir.batch2.jobs.models.BatchResourceId; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.test.BaseJpaR4Test; +import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; +import org.apache.commons.lang3.StringUtils; import org.hl7.fhir.r4.model.Patient; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -20,13 +21,17 @@ import org.mockito.Mock; import org.springframework.beans.factory.annotation.Autowired; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.IntStream; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -44,7 +49,9 @@ public class ExpandResourcesStepJpaTest extends BaseJpaR4Test { public void afterCleanupDao() { super.afterCleanupDao(); - myStorageSettings.setTagStorageMode(new JpaStorageSettings().getTagStorageMode()); + JpaStorageSettings defaults = new JpaStorageSettings(); + myStorageSettings.setTagStorageMode(defaults.getTagStorageMode()); + myStorageSettings.setBulkExportFileMaximumSize(defaults.getBulkExportFileMaximumSize()); } /** @@ -194,4 +201,60 @@ public class ExpandResourcesStepJpaTest extends BaseJpaR4Test { } + @Test + public void testMaximumChunkSize() { + /* + * We're going to set the maximum file size to 3000, and create some resources with + * a name that is 1000 chars long. With the other boilerplate text in a resource that + * will put the resource length at just over 1000 chars, meaning that any given + * chunk or file should have only 2 resources in it. + */ + int testResourceSize = 1000; + int maxFileSize = 3 * testResourceSize; + myStorageSettings.setBulkExportFileMaximumSize(maxFileSize); + + List expectedIds = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Patient p = new Patient(); + p.addName().setFamily(StringUtils.leftPad("", testResourceSize, 'A')); + String id = myPatientDao.create(p, mySrd).getId().getIdPart(); + expectedIds.add(new BatchResourceId().setResourceType("Patient").setId(id)); + } + Collections.sort(expectedIds); + + ResourceIdList resourceList = new ResourceIdList(); + resourceList.setResourceType("Patient"); + resourceList.setIds(expectedIds); + + BulkExportJobParameters params = new BulkExportJobParameters(); + JobInstance jobInstance = new JobInstance(); + String chunkId = "ABC"; + + StepExecutionDetails details = new StepExecutionDetails<>(params, resourceList, jobInstance, chunkId); + + // Test + + myCaptureQueriesListener.clear(); + myExpandResourcesStep.run(details, mySink); + + // Verify + verify(mySink, atLeast(1)).accept(myWorkChunkCaptor.capture()); + List actualResourceIdList = new ArrayList<>(); + for (var next : myWorkChunkCaptor.getAllValues()) { + int nextSize = String.join("\n", next.getStringifiedResources()).length(); + ourLog.info("Next size: {}", nextSize); + assertThat(nextSize, lessThanOrEqualTo(maxFileSize)); + next.getStringifiedResources().stream() + .filter(StringUtils::isNotBlank) + .map(t->myFhirContext.newJsonParser().parseResource(t)) + .map(t->new BatchResourceId().setResourceType(t.getIdElement().getResourceType()).setId(t.getIdElement().getIdPart())) + .forEach(actualResourceIdList::add); + } + + Collections.sort(actualResourceIdList); + assertEquals(expectedIds, actualResourceIdList); + + + } + } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/export/FetchResourceIdsStepJpaTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/export/FetchResourceIdsStepJpaTest.java index e6e6fd08108..c27568b5ff1 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/export/FetchResourceIdsStepJpaTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/export/FetchResourceIdsStepJpaTest.java @@ -10,14 +10,18 @@ import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.dao.r4.FhirResourceDaoR4TagsTest; import ca.uhn.fhir.jpa.test.BaseJpaR4Test; +import ca.uhn.fhir.util.JsonUtil; import org.hl7.fhir.r4.model.DateTimeType; import org.hl7.fhir.r4.model.OrganizationAffiliation; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -38,7 +42,9 @@ public class FetchResourceIdsStepJpaTest extends BaseJpaR4Test { public void afterCleanupDao() { super.afterCleanupDao(); - myStorageSettings.setTagStorageMode(new JpaStorageSettings().getTagStorageMode()); + JpaStorageSettings defaults = new JpaStorageSettings(); + myStorageSettings.setTagStorageMode(defaults.getTagStorageMode()); + myStorageSettings.setBulkExportFileMaximumSize(defaults.getBulkExportFileMaximumSize()); } @Test @@ -74,6 +80,39 @@ public class FetchResourceIdsStepJpaTest extends BaseJpaR4Test { assertEquals(10, idList.getIds().size()); } + @Test + public void testChunkMaximumSize() { + myStorageSettings.setBulkExportFileMaximumSize(500); + for (int i = 0; i < 100; i++) { + OrganizationAffiliation orgAff = new OrganizationAffiliation(); + orgAff.setActive(true); + myOrganizationAffiliationDao.create(orgAff, mySrd); + } + + BulkExportJobParameters params = new BulkExportJobParameters(); + params.setResourceTypes(List.of("OrganizationAffiliation")); + VoidModel data = new VoidModel(); + JobInstance instance = new JobInstance(); + instance.setInstanceId("instance-id"); + String chunkId = "chunk-id"; + StepExecutionDetails executionDetails = new StepExecutionDetails<>(params, data, instance, chunkId); + + // Test + myFetchResourceIdsStep.run(executionDetails, mySink); + + // Verify + verify(mySink, Mockito.atLeast(1)).accept(myResourceIdListCaptor.capture()); + List idLists = myResourceIdListCaptor.getAllValues(); + for (var next : idLists) { + String serialized = JsonUtil.serialize(next, false); + + // Note that the 600 is a bit higher than the configured maximum of 500 above, + // because our chunk size estimate is not totally accurate, but it's not + // going to be way off, less than 100 regardless of how big the maximum is + assertThat(serialized, serialized.length(), lessThanOrEqualTo(600)); + } + + } } diff --git a/hapi-fhir-jpaserver-test-r4b/src/test/java/ca/uhn/fhir/batch2/jobs/models/BatchResourceIdTest.java b/hapi-fhir-jpaserver-test-r4b/src/test/java/ca/uhn/fhir/batch2/jobs/models/BatchResourceIdTest.java new file mode 100644 index 00000000000..fce96e883e9 --- /dev/null +++ b/hapi-fhir-jpaserver-test-r4b/src/test/java/ca/uhn/fhir/batch2/jobs/models/BatchResourceIdTest.java @@ -0,0 +1,20 @@ +package ca.uhn.fhir.batch2.jobs.models; + +import ca.uhn.fhir.util.JsonUtil; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class BatchResourceIdTest { + + @Test + public void testEstimateSize() { + BatchResourceId id = new BatchResourceId(); + id.setId("12345"); + id.setResourceType("Patient"); + String serialized = JsonUtil.serialize(id, false); + assertEquals(serialized.length(), id.estimateSerializedSize(), serialized); + } + + +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkExportAppCtx.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkExportAppCtx.java index b44bb04f8c0..56de694bc49 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkExportAppCtx.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkExportAppCtx.java @@ -36,6 +36,7 @@ import org.springframework.context.annotation.Scope; public class BulkExportAppCtx { public static final String WRITE_TO_BINARIES = "write-to-binaries"; + public static final String CREATE_REPORT_STEP = "create-report-step"; @Bean public JobDefinition bulkExportJobDefinition() { @@ -65,7 +66,7 @@ public class BulkExportAppCtx { writeBinaryStep()) // finalize the job (set to complete) .addFinalReducerStep( - "create-report-step", + CREATE_REPORT_STEP, "Creates the output report from a bulk export job", BulkExportJobResults.class, createReportStep()) @@ -119,16 +120,25 @@ public class BulkExportAppCtx { return new FetchResourceIdsStep(); } + /** + * Note, this bean is only used for version 1 of the bulk export job definition + */ @Bean public ExpandResourcesStep expandResourcesStep() { return new ExpandResourcesStep(); } + /** + * Note, this bean is only used for version 1 of the bulk export job definition + */ @Bean public WriteBinaryStep writeBinaryStep() { return new WriteBinaryStep(); } + /** + * Note, this bean is only used for version 2 of the bulk export job definition + */ @Bean public ExpandResourceAndWriteBinaryStep expandResourceAndWriteBinaryStep() { return new ExpandResourceAndWriteBinaryStep(); diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourceAndWriteBinaryStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourceAndWriteBinaryStep.java index 6f43cc67967..6200ffdd8c0 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourceAndWriteBinaryStep.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourceAndWriteBinaryStep.java @@ -34,6 +34,7 @@ import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.executor.InterceptorService; import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; @@ -41,7 +42,6 @@ import ca.uhn.fhir.jpa.api.model.PersistentIdToForcedIdMap; import ca.uhn.fhir.jpa.api.svc.IIdHelperService; import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; -import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; @@ -75,11 +75,13 @@ import org.springframework.context.ApplicationContext; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; -import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; import java.util.stream.Collectors; import static ca.uhn.fhir.rest.api.Constants.PARAM_ID; @@ -103,7 +105,7 @@ public class ExpandResourceAndWriteBinaryStep private IBulkExportProcessor myBulkExportProcessor; @Autowired - private StorageSettings myStorageSettings; + private JpaStorageSettings myStorageSettings; @Autowired private ApplicationContext myApplicationContext; @@ -119,6 +121,23 @@ public class ExpandResourceAndWriteBinaryStep private volatile ResponseTerminologyTranslationSvc myResponseTerminologyTranslationSvc; + /** + * Note on the design of this step: + * This step takes a list of resource PIDs as input, fetches those + * resources, applies a bunch of filtering/consent/MDM/etc. modifications + * on them, serializes the result as NDJSON files, and then persists those + * NDJSON files as Binary resources. + *

+ * We want to avoid writing files which exceed the configured maximum + * file size, and we also want to avoid keeping too much in memory + * at any given time, so this class works a bit like a stream processor + * (although not using Java streams). + *

+ * The {@link #fetchResourcesByIdAndConsumeThem(ResourceIdList, RequestPartitionId, Consumer)} + * method loads the resources by ID, {@link ExpandResourcesConsumer} handles + * the filtering and whatnot, then the {@link NdJsonResourceWriter} + * ultimately writes them. + */ @Nonnull @Override public RunOutcome run( @@ -126,235 +145,36 @@ public class ExpandResourceAndWriteBinaryStep @Nonnull IJobDataSink theDataSink) throws JobExecutionFailedException { - List expandedResourcesList = expandResourcesFromList(theStepExecutionDetails); - int numResourcesProcessed = 0; - ourLog.info("Write binary step of Job Export"); + // Currently only NDJSON output format is supported, but we could add other + // kinds of writers here for other formats if needed + NdJsonResourceWriter resourceWriter = new NdJsonResourceWriter(theStepExecutionDetails, theDataSink); - // write to binary each resource type separately, without chunking, we need to do this in a loop now - for (ExpandedResourcesList expandedResources : expandedResourcesList) { + expandResourcesFromList(theStepExecutionDetails, resourceWriter); - numResourcesProcessed += expandedResources.getStringifiedResources().size(); - - ourLog.info("Writing {} resources to binary file", numResourcesProcessed); - - @SuppressWarnings("unchecked") - IFhirResourceDao binaryDao = myDaoRegistry.getResourceDao("Binary"); - - IBaseBinary binary = BinaryUtil.newBinary(myFhirContext); - - addMetadataExtensionsToBinary(theStepExecutionDetails, expandedResources, binary); - - // TODO - // should be dependent on the output format in parameters but for now, only NDJSON is supported - binary.setContentType(Constants.CT_FHIR_NDJSON); - - int processedRecordsCount = 0; - try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { - try (OutputStreamWriter streamWriter = getStreamWriter(outputStream)) { - for (String stringified : expandedResources.getStringifiedResources()) { - streamWriter.append(stringified); - streamWriter.append("\n"); - processedRecordsCount++; - } - streamWriter.flush(); - outputStream.flush(); - } - binary.setContent(outputStream.toByteArray()); - } catch (IOException ex) { - String errorMsg = String.format( - "Failure to process resource of type %s : %s", - expandedResources.getResourceType(), ex.getMessage()); - ourLog.error(errorMsg); - - throw new JobExecutionFailedException(Msg.code(2431) + errorMsg); - } - - SystemRequestDetails srd = new SystemRequestDetails(); - BulkExportJobParameters jobParameters = theStepExecutionDetails.getParameters(); - RequestPartitionId partitionId = jobParameters.getPartitionId(); - if (partitionId == null) { - srd.setRequestPartitionId(RequestPartitionId.defaultPartition()); - } else { - srd.setRequestPartitionId(partitionId); - } - - // Pick a unique ID and retry until we get one that isn't already used. This is just to - // avoid any possibility of people guessing the IDs of these Binaries and fishing for them. - while (true) { - // Use a random ID to make it harder to guess IDs - 32 characters of a-zA-Z0-9 - // has 190 bts of entropy according to https://www.omnicalculator.com/other/password-entropy - String proposedId = RandomTextUtils.newSecureRandomAlphaNumericString(32); - binary.setId(proposedId); - - // Make sure we don't accidentally reuse an ID. This should be impossible given the - // amount of entropy in the IDs but might as well be sure. - try { - IBaseBinary output = binaryDao.read(binary.getIdElement(), new SystemRequestDetails(), true); - if (output != null) { - continue; - } - } catch (ResourceNotFoundException e) { - // good - } - - break; - } - - if (myFhirContext.getVersion().getVersion().isNewerThan(FhirVersionEnum.DSTU2)) { - if (isNotBlank(jobParameters.getBinarySecurityContextIdentifierSystem()) - || isNotBlank(jobParameters.getBinarySecurityContextIdentifierValue())) { - FhirTerser terser = myFhirContext.newTerser(); - terser.setElement( - binary, - "securityContext.identifier.system", - jobParameters.getBinarySecurityContextIdentifierSystem()); - terser.setElement( - binary, - "securityContext.identifier.value", - jobParameters.getBinarySecurityContextIdentifierValue()); - } - } - - DaoMethodOutcome outcome = binaryDao.update(binary, srd); - IIdType id = outcome.getId(); - - BulkExportBinaryFileId bulkExportBinaryFileId = new BulkExportBinaryFileId(); - bulkExportBinaryFileId.setBinaryId(id.getValueAsString()); - bulkExportBinaryFileId.setResourceType(expandedResources.getResourceType()); - theDataSink.accept(bulkExportBinaryFileId); - - ourLog.info( - "Binary writing complete for {} resources of type {}.", - processedRecordsCount, - expandedResources.getResourceType()); - } - return new RunOutcome(numResourcesProcessed); + return new RunOutcome(resourceWriter.getNumResourcesProcessed()); } - private List expandResourcesFromList( - StepExecutionDetails theStepExecutionDetails) { - List expandedResourcesList = new ArrayList<>(); - String instanceId = theStepExecutionDetails.getInstance().getInstanceId(); - String chunkId = theStepExecutionDetails.getChunkId(); + private void expandResourcesFromList( + StepExecutionDetails theStepExecutionDetails, + Consumer theResourceWriter) { + ResourceIdList idList = theStepExecutionDetails.getData(); BulkExportJobParameters parameters = theStepExecutionDetails.getParameters(); - ourLog.info( - "Bulk export instance[{}] chunk[{}] - About to expand {} resource IDs into their full resource bodies.", - instanceId, - chunkId, - idList.getIds().size()); + Consumer> resourceListConsumer = + new ExpandResourcesConsumer(theStepExecutionDetails, theResourceWriter); // search the resources - List allResources = fetchAllResources(idList, parameters.getPartitionId()); - - // Apply post-fetch filtering - String resourceType = idList.getResourceType(); - List postFetchFilterUrls = parameters.getPostFetchFilterUrls().stream() - .filter(t -> t.substring(0, t.indexOf('?')).equals(resourceType)) - .collect(Collectors.toList()); - - if (!postFetchFilterUrls.isEmpty()) { - applyPostFetchFiltering(allResources, postFetchFilterUrls, instanceId, chunkId); - } - - // if necessary, expand resources - if (parameters.isExpandMdm()) { - myBulkExportProcessor.expandMdmResources(allResources); - } - - // Normalize terminology - if (myStorageSettings.isNormalizeTerminologyForBulkExportJobs()) { - ResponseTerminologyTranslationSvc terminologyTranslationSvc = myResponseTerminologyTranslationSvc; - if (terminologyTranslationSvc == null) { - terminologyTranslationSvc = myApplicationContext.getBean(ResponseTerminologyTranslationSvc.class); - myResponseTerminologyTranslationSvc = terminologyTranslationSvc; - } - terminologyTranslationSvc.processResourcesForTerminologyTranslation(allResources); - } - - // Interceptor call - if (myInterceptorService.hasHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION)) { - for (Iterator iter = allResources.iterator(); iter.hasNext(); ) { - HookParams params = new HookParams() - .add(BulkExportJobParameters.class, theStepExecutionDetails.getParameters()) - .add(IBaseResource.class, iter.next()); - boolean outcome = - myInterceptorService.callHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION, params); - if (!outcome) { - iter.remove(); - } - } - } - - // encode them - Key is resource type, Value is a collection of serialized resources of that type - ListMultimap resources = encodeToString(allResources, parameters); - - for (String nextResourceType : resources.keySet()) { - - ExpandedResourcesList output = new ExpandedResourcesList(); - output.setStringifiedResources(resources.get(nextResourceType)); - output.setResourceType(nextResourceType); - expandedResourcesList.add(output); - - ourLog.info( - "Expanding of {} resources of type {} completed", - idList.getIds().size(), - idList.getResourceType()); - } - return expandedResourcesList; + fetchResourcesByIdAndConsumeThem(idList, parameters.getPartitionId(), resourceListConsumer); } - private void applyPostFetchFiltering( - List theResources, - List thePostFetchFilterUrls, - String theInstanceId, - String theChunkId) { - int numRemoved = 0; - for (Iterator iter = theResources.iterator(); iter.hasNext(); ) { - boolean matched = applyPostFetchFilteringForSingleResource(thePostFetchFilterUrls, iter); - - if (!matched) { - iter.remove(); - numRemoved++; - } - } - - if (numRemoved > 0) { - ourLog.info( - "Bulk export instance[{}] chunk[{}] - {} resources were filtered out because of post-fetch filter URLs", - theInstanceId, - theChunkId, - numRemoved); - } - } - - private boolean applyPostFetchFilteringForSingleResource( - List thePostFetchFilterUrls, Iterator iter) { - IBaseResource nextResource = iter.next(); - String nextResourceType = myFhirContext.getResourceType(nextResource); - - for (String nextPostFetchFilterUrl : thePostFetchFilterUrls) { - if (nextPostFetchFilterUrl.contains("?")) { - String resourceType = nextPostFetchFilterUrl.substring(0, nextPostFetchFilterUrl.indexOf('?')); - if (nextResourceType.equals(resourceType)) { - InMemoryMatchResult matchResult = myInMemoryResourceMatcher.match( - nextPostFetchFilterUrl, nextResource, null, new SystemRequestDetails()); - if (matchResult.matched()) { - return true; - } - } - } - } - return false; - } - - private List fetchAllResources(ResourceIdList theIds, RequestPartitionId theRequestPartitionId) { + private void fetchResourcesByIdAndConsumeThem( + ResourceIdList theIds, + RequestPartitionId theRequestPartitionId, + Consumer> theResourceListConsumer) { ArrayListMultimap typeToIds = ArrayListMultimap.create(); theIds.getIds().forEach(t -> typeToIds.put(t.getResourceType(), t.getId())); - List resources = new ArrayList<>(theIds.getIds().size()); - for (String resourceType : typeToIds.keySet()) { IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType); @@ -383,31 +203,9 @@ public class ExpandResourceAndWriteBinaryStep SearchParameterMap spMap = SearchParameterMap.newSynchronous().add(PARAM_ID, idListParam); IBundleProvider outcome = dao.search(spMap, new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId)); - resources.addAll(outcome.getAllResources()); + theResourceListConsumer.accept(outcome.getAllResources()); } } - - return resources; - } - - private ListMultimap encodeToString( - List theResources, BulkExportJobParameters theParameters) { - IParser parser = getParser(theParameters); - - ListMultimap retVal = ArrayListMultimap.create(); - for (IBaseResource resource : theResources) { - String type = myFhirContext.getResourceType(resource); - String jsonResource = parser.encodeResourceToString(resource); - retVal.put(type, jsonResource); - } - return retVal; - } - - private IParser getParser(BulkExportJobParameters theParameters) { - // The parser depends on the output format - // but for now, only ndjson is supported - // see WriteBinaryStep as well - return myFhirContext.newJsonParser().setPrettyPrint(false); } /** @@ -462,4 +260,310 @@ public class ExpandResourceAndWriteBinaryStep public void setIdHelperServiceForUnitTest(IIdHelperService theIdHelperService) { myIdHelperService = theIdHelperService; } + + /** + * This class takes a collection of lists of resources read from the + * repository, and processes them, then converts them into + * {@link ExpandedResourcesList} instances, each one of which corresponds + * to a single output file. We try to avoid exceeding the maximum file + * size defined in + * {@link JpaStorageSettings#getBulkExportFileMaximumSize()} + * so we will do our best to emit multiple lists in favour of emitting + * a list that exceeds that threshold. + */ + private class ExpandResourcesConsumer implements Consumer> { + + private final Consumer myResourceWriter; + private final StepExecutionDetails myStepExecutionDetails; + + public ExpandResourcesConsumer( + StepExecutionDetails theStepExecutionDetails, + Consumer theResourceWriter) { + myStepExecutionDetails = theStepExecutionDetails; + myResourceWriter = theResourceWriter; + } + + @Override + public void accept(List theResources) throws JobExecutionFailedException { + String instanceId = myStepExecutionDetails.getInstance().getInstanceId(); + String chunkId = myStepExecutionDetails.getChunkId(); + ResourceIdList idList = myStepExecutionDetails.getData(); + BulkExportJobParameters parameters = myStepExecutionDetails.getParameters(); + + ourLog.info( + "Bulk export instance[{}] chunk[{}] - About to expand {} resource IDs into their full resource bodies.", + instanceId, + chunkId, + idList.getIds().size()); + + // Apply post-fetch filtering + String resourceType = idList.getResourceType(); + List postFetchFilterUrls = parameters.getPostFetchFilterUrls().stream() + .filter(t -> t.substring(0, t.indexOf('?')).equals(resourceType)) + .collect(Collectors.toList()); + + if (!postFetchFilterUrls.isEmpty()) { + applyPostFetchFiltering(theResources, postFetchFilterUrls, instanceId, chunkId); + } + + // if necessary, expand resources + if (parameters.isExpandMdm()) { + myBulkExportProcessor.expandMdmResources(theResources); + } + + // Normalize terminology + if (myStorageSettings.isNormalizeTerminologyForBulkExportJobs()) { + ResponseTerminologyTranslationSvc terminologyTranslationSvc = myResponseTerminologyTranslationSvc; + if (terminologyTranslationSvc == null) { + terminologyTranslationSvc = myApplicationContext.getBean(ResponseTerminologyTranslationSvc.class); + myResponseTerminologyTranslationSvc = terminologyTranslationSvc; + } + terminologyTranslationSvc.processResourcesForTerminologyTranslation(theResources); + } + + // Interceptor call + if (myInterceptorService.hasHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION)) { + for (Iterator iter = theResources.iterator(); iter.hasNext(); ) { + HookParams params = new HookParams() + .add(BulkExportJobParameters.class, myStepExecutionDetails.getParameters()) + .add(IBaseResource.class, iter.next()); + boolean outcome = + myInterceptorService.callHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION, params); + if (!outcome) { + iter.remove(); + } + } + } + + // encode them - Key is resource type, Value is a collection of serialized resources of that type + IParser parser = getParser(parameters); + + ListMultimap resourceTypeToStringifiedResources = ArrayListMultimap.create(); + Map resourceTypeToTotalSize = new HashMap<>(); + for (IBaseResource resource : theResources) { + String type = myFhirContext.getResourceType(resource); + int existingSize = resourceTypeToTotalSize.getOrDefault(type, 0); + + String jsonResource = parser.encodeResourceToString(resource); + int newSize = existingSize + jsonResource.length(); + + // If adding another stringified resource to the list for the given type + // would exceed the configured maximum allowed, then let's send the current + // list and flush it. Note that if a single resource exceeds the configurable + // maximum then we have no choice but to send it + long bulkExportFileMaximumSize = myStorageSettings.getBulkExportFileMaximumSize(); + if (newSize > bulkExportFileMaximumSize) { + if (existingSize == 0) { + // If no files are already in the collection, then this one file + // is bigger than the maximum allowable. We'll allow it in that + // case + ourLog.warn( + "Single resource size {} exceeds allowable maximum of {}, so will ignore maximum", + newSize, + bulkExportFileMaximumSize); + } else { + // Otherwise, flush the contents now before adding the next file + List stringifiedResources = resourceTypeToStringifiedResources.get(type); + writeStringifiedResources(type, stringifiedResources); + + resourceTypeToStringifiedResources.removeAll(type); + newSize = jsonResource.length(); + } + } + + resourceTypeToStringifiedResources.put(type, jsonResource); + resourceTypeToTotalSize.put(type, newSize); + } + + for (String nextResourceType : resourceTypeToStringifiedResources.keySet()) { + List stringifiedResources = resourceTypeToStringifiedResources.get(nextResourceType); + writeStringifiedResources(nextResourceType, stringifiedResources); + } + } + + private void writeStringifiedResources(String theResourceType, List theStringifiedResources) { + if (!theStringifiedResources.isEmpty()) { + + ExpandedResourcesList output = new ExpandedResourcesList(); + output.setStringifiedResources(theStringifiedResources); + output.setResourceType(theResourceType); + myResourceWriter.accept(output); + + ourLog.info( + "Expanding of {} resources of type {} completed", + theStringifiedResources.size(), + theResourceType); + } + } + + private void applyPostFetchFiltering( + List theResources, + List thePostFetchFilterUrls, + String theInstanceId, + String theChunkId) { + int numRemoved = 0; + for (Iterator iter = theResources.iterator(); iter.hasNext(); ) { + boolean matched = applyPostFetchFilteringForSingleResource(thePostFetchFilterUrls, iter); + + if (!matched) { + iter.remove(); + numRemoved++; + } + } + + if (numRemoved > 0) { + ourLog.info( + "Bulk export instance[{}] chunk[{}] - {} resources were filtered out because of post-fetch filter URLs", + theInstanceId, + theChunkId, + numRemoved); + } + } + + private boolean applyPostFetchFilteringForSingleResource( + List thePostFetchFilterUrls, Iterator iter) { + IBaseResource nextResource = iter.next(); + String nextResourceType = myFhirContext.getResourceType(nextResource); + + for (String nextPostFetchFilterUrl : thePostFetchFilterUrls) { + if (nextPostFetchFilterUrl.contains("?")) { + String resourceType = nextPostFetchFilterUrl.substring(0, nextPostFetchFilterUrl.indexOf('?')); + if (nextResourceType.equals(resourceType)) { + InMemoryMatchResult matchResult = myInMemoryResourceMatcher.match( + nextPostFetchFilterUrl, nextResource, null, new SystemRequestDetails()); + if (matchResult.matched()) { + return true; + } + } + } + } + return false; + } + + private IParser getParser(BulkExportJobParameters theParameters) { + // The parser depends on the output format + // but for now, only ndjson is supported + // see WriteBinaryStep as well + return myFhirContext.newJsonParser().setPrettyPrint(false); + } + } + + /** + * This class takes a collection of expanded resources, and expands it to + * an NDJSON file, which is written to a Binary resource. + */ + private class NdJsonResourceWriter implements Consumer { + + private final StepExecutionDetails myStepExecutionDetails; + private final IJobDataSink myDataSink; + private int myNumResourcesProcessed = 0; + + public NdJsonResourceWriter( + StepExecutionDetails theStepExecutionDetails, + IJobDataSink theDataSink) { + this.myStepExecutionDetails = theStepExecutionDetails; + this.myDataSink = theDataSink; + } + + public int getNumResourcesProcessed() { + return myNumResourcesProcessed; + } + + @Override + public void accept(ExpandedResourcesList theExpandedResourcesList) throws JobExecutionFailedException { + int batchSize = theExpandedResourcesList.getStringifiedResources().size(); + ourLog.info("Writing {} resources to binary file", batchSize); + + myNumResourcesProcessed += batchSize; + + @SuppressWarnings("unchecked") + IFhirResourceDao binaryDao = myDaoRegistry.getResourceDao("Binary"); + + IBaseBinary binary = BinaryUtil.newBinary(myFhirContext); + + addMetadataExtensionsToBinary(myStepExecutionDetails, theExpandedResourcesList, binary); + + binary.setContentType(Constants.CT_FHIR_NDJSON); + + int processedRecordsCount = 0; + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + try (OutputStreamWriter streamWriter = getStreamWriter(outputStream)) { + for (String stringified : theExpandedResourcesList.getStringifiedResources()) { + streamWriter.append(stringified); + streamWriter.append("\n"); + processedRecordsCount++; + } + streamWriter.flush(); + outputStream.flush(); + } + binary.setContent(outputStream.toByteArray()); + } catch (IOException ex) { + String errorMsg = String.format( + "Failure to process resource of type %s : %s", + theExpandedResourcesList.getResourceType(), ex.getMessage()); + ourLog.error(errorMsg); + + throw new JobExecutionFailedException(Msg.code(2431) + errorMsg); + } + + SystemRequestDetails srd = new SystemRequestDetails(); + BulkExportJobParameters jobParameters = myStepExecutionDetails.getParameters(); + RequestPartitionId partitionId = jobParameters.getPartitionId(); + if (partitionId == null) { + srd.setRequestPartitionId(RequestPartitionId.defaultPartition()); + } else { + srd.setRequestPartitionId(partitionId); + } + + // Pick a unique ID and retry until we get one that isn't already used. This is just to + // avoid any possibility of people guessing the IDs of these Binaries and fishing for them. + while (true) { + // Use a random ID to make it harder to guess IDs - 32 characters of a-zA-Z0-9 + // has 190 bts of entropy according to https://www.omnicalculator.com/other/password-entropy + String proposedId = RandomTextUtils.newSecureRandomAlphaNumericString(32); + binary.setId(proposedId); + + // Make sure we don't accidentally reuse an ID. This should be impossible given the + // amount of entropy in the IDs but might as well be sure. + try { + IBaseBinary output = binaryDao.read(binary.getIdElement(), new SystemRequestDetails(), true); + if (output != null) { + continue; + } + } catch (ResourceNotFoundException e) { + // good + } + + break; + } + + if (myFhirContext.getVersion().getVersion().isNewerThan(FhirVersionEnum.DSTU2)) { + if (isNotBlank(jobParameters.getBinarySecurityContextIdentifierSystem()) + || isNotBlank(jobParameters.getBinarySecurityContextIdentifierValue())) { + FhirTerser terser = myFhirContext.newTerser(); + terser.setElement( + binary, + "securityContext.identifier.system", + jobParameters.getBinarySecurityContextIdentifierSystem()); + terser.setElement( + binary, + "securityContext.identifier.value", + jobParameters.getBinarySecurityContextIdentifierValue()); + } + } + + DaoMethodOutcome outcome = binaryDao.update(binary, srd); + IIdType id = outcome.getId(); + + BulkExportBinaryFileId bulkExportBinaryFileId = new BulkExportBinaryFileId(); + bulkExportBinaryFileId.setBinaryId(id.getValueAsString()); + bulkExportBinaryFileId.setResourceType(theExpandedResourcesList.getResourceType()); + myDataSink.accept(bulkExportBinaryFileId); + + ourLog.info( + "Binary writing complete for {} resources of type {}.", + processedRecordsCount, + theExpandedResourcesList.getResourceType()); + } + } } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourcesStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourcesStep.java index e2cce9c5f31..8c8f0490d9e 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourcesStep.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourcesStep.java @@ -26,18 +26,19 @@ import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList; import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList; +import ca.uhn.fhir.batch2.jobs.models.BatchResourceId; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.executor.InterceptorService; import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.model.PersistentIdToForcedIdMap; import ca.uhn.fhir.jpa.api.svc.IIdHelperService; import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; -import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryResourceMatcher; @@ -52,6 +53,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import jakarta.annotation.Nonnull; +import org.apache.commons.collections4.ListUtils; import org.hl7.fhir.instance.model.api.IBaseResource; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; @@ -84,7 +86,7 @@ public class ExpandResourcesStep private ApplicationContext myApplicationContext; @Autowired - private StorageSettings myStorageSettings; + private JpaStorageSettings myStorageSettings; @Autowired private IIdHelperService myIdHelperService; @@ -108,72 +110,99 @@ public class ExpandResourcesStep throws JobExecutionFailedException { String instanceId = theStepExecutionDetails.getInstance().getInstanceId(); String chunkId = theStepExecutionDetails.getChunkId(); - ResourceIdList idList = theStepExecutionDetails.getData(); + ResourceIdList data = theStepExecutionDetails.getData(); BulkExportJobParameters parameters = theStepExecutionDetails.getParameters(); ourLog.info( "Bulk export instance[{}] chunk[{}] - About to expand {} resource IDs into their full resource bodies.", instanceId, chunkId, - idList.getIds().size()); + data.getIds().size()); - // search the resources - List allResources = fetchAllResources(idList, parameters.getPartitionId()); + // Partition the ID list in order to only fetch a reasonable number at a time + List> idLists = ListUtils.partition(data.getIds(), 100); - // Apply post-fetch filtering - String resourceType = idList.getResourceType(); - List postFetchFilterUrls = parameters.getPostFetchFilterUrls().stream() - .filter(t -> t.substring(0, t.indexOf('?')).equals(resourceType)) - .collect(Collectors.toList()); + for (List idList : idLists) { - if (!postFetchFilterUrls.isEmpty()) { - applyPostFetchFiltering(allResources, postFetchFilterUrls, instanceId, chunkId); - } + // search the resources + List allResources = fetchAllResources(idList, parameters.getPartitionId()); - // if necessary, expand resources - if (parameters.isExpandMdm()) { - myBulkExportProcessor.expandMdmResources(allResources); - } + // Apply post-fetch filtering + String resourceType = data.getResourceType(); + List postFetchFilterUrls = parameters.getPostFetchFilterUrls().stream() + .filter(t -> t.substring(0, t.indexOf('?')).equals(resourceType)) + .collect(Collectors.toList()); - // Normalize terminology - if (myStorageSettings.isNormalizeTerminologyForBulkExportJobs()) { - ResponseTerminologyTranslationSvc terminologyTranslationSvc = myResponseTerminologyTranslationSvc; - if (terminologyTranslationSvc == null) { - terminologyTranslationSvc = myApplicationContext.getBean(ResponseTerminologyTranslationSvc.class); - myResponseTerminologyTranslationSvc = terminologyTranslationSvc; + if (!postFetchFilterUrls.isEmpty()) { + applyPostFetchFiltering(allResources, postFetchFilterUrls, instanceId, chunkId); } - terminologyTranslationSvc.processResourcesForTerminologyTranslation(allResources); - } - // Interceptor call - if (myInterceptorService.hasHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION)) { - for (Iterator iter = allResources.iterator(); iter.hasNext(); ) { - HookParams params = new HookParams() - .add(BulkExportJobParameters.class, theStepExecutionDetails.getParameters()) - .add(IBaseResource.class, iter.next()); - boolean outcome = - myInterceptorService.callHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION, params); - if (!outcome) { - iter.remove(); + // if necessary, expand resources + if (parameters.isExpandMdm()) { + myBulkExportProcessor.expandMdmResources(allResources); + } + + // Normalize terminology + if (myStorageSettings.isNormalizeTerminologyForBulkExportJobs()) { + ResponseTerminologyTranslationSvc terminologyTranslationSvc = myResponseTerminologyTranslationSvc; + if (terminologyTranslationSvc == null) { + terminologyTranslationSvc = myApplicationContext.getBean(ResponseTerminologyTranslationSvc.class); + myResponseTerminologyTranslationSvc = terminologyTranslationSvc; + } + terminologyTranslationSvc.processResourcesForTerminologyTranslation(allResources); + } + + // Interceptor call + if (myInterceptorService.hasHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION)) { + for (Iterator iter = allResources.iterator(); iter.hasNext(); ) { + HookParams params = new HookParams() + .add(BulkExportJobParameters.class, theStepExecutionDetails.getParameters()) + .add(IBaseResource.class, iter.next()); + boolean outcome = + myInterceptorService.callHooks(Pointcut.STORAGE_BULK_EXPORT_RESOURCE_INCLUSION, params); + if (!outcome) { + iter.remove(); + } } } - } - // encode them - Key is resource type, Value is a collection of serialized resources of that type - ListMultimap resources = encodeToString(allResources, parameters); + // encode them - Key is resource type, Value is a collection of serialized resources of that type + ListMultimap resources = encodeToString(allResources, parameters); - // set to datasink - for (String nextResourceType : resources.keySet()) { + // send to datasink + long maxFileSize = myStorageSettings.getBulkExportFileMaximumSize(); + long currentFileSize = 0; + for (String nextResourceType : resources.keySet()) { - ExpandedResourcesList output = new ExpandedResourcesList(); - output.setStringifiedResources(resources.get(nextResourceType)); - output.setResourceType(nextResourceType); - theDataSink.accept(output); + List stringifiedResources = resources.get(nextResourceType); + List currentFileStringifiedResources = new ArrayList<>(); - ourLog.info( - "Expanding of {} resources of type {} completed", - idList.getIds().size(), - idList.getResourceType()); + for (String nextStringifiedResource : stringifiedResources) { + + if (currentFileSize + nextStringifiedResource.length() > maxFileSize + && !currentFileStringifiedResources.isEmpty()) { + ExpandedResourcesList output = new ExpandedResourcesList(); + output.setStringifiedResources(currentFileStringifiedResources); + output.setResourceType(nextResourceType); + theDataSink.accept(output); + + currentFileStringifiedResources = new ArrayList<>(); + currentFileSize = 0; + } + + currentFileStringifiedResources.add(nextStringifiedResource); + currentFileSize += nextStringifiedResource.length(); + } + + if (!currentFileStringifiedResources.isEmpty()) { + ExpandedResourcesList output = new ExpandedResourcesList(); + output.setStringifiedResources(currentFileStringifiedResources); + output.setResourceType(nextResourceType); + theDataSink.accept(output); + } + + ourLog.info("Expanding of {} resources of type {} completed", idList.size(), data.getResourceType()); + } } // and return @@ -224,42 +253,36 @@ public class ExpandResourcesStep return false; } - private List fetchAllResources(ResourceIdList theIds, RequestPartitionId theRequestPartitionId) { + private List fetchAllResources( + List theIds, RequestPartitionId theRequestPartitionId) { ArrayListMultimap typeToIds = ArrayListMultimap.create(); - theIds.getIds().forEach(t -> typeToIds.put(t.getResourceType(), t.getId())); + theIds.forEach(t -> typeToIds.put(t.getResourceType(), t.getId())); - List resources = new ArrayList<>(theIds.getIds().size()); + List resources = new ArrayList<>(theIds.size()); for (String resourceType : typeToIds.keySet()) { IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType); List allIds = typeToIds.get(resourceType); - while (!allIds.isEmpty()) { - // Load in batches in order to avoid having too many PIDs go into a - // single SQ statement at once - int batchSize = Math.min(500, allIds.size()); + Set nextBatchOfPids = allIds.stream() + .map(t -> myIdHelperService.newPidFromStringIdAndResourceName(t, resourceType)) + .collect(Collectors.toSet()); - Set nextBatchOfPids = allIds.subList(0, batchSize).stream() - .map(t -> myIdHelperService.newPidFromStringIdAndResourceName(t, resourceType)) - .collect(Collectors.toSet()); - allIds = allIds.subList(batchSize, allIds.size()); + PersistentIdToForcedIdMap nextBatchOfResourceIds = myTransactionService + .withRequest(null) + .execute(() -> myIdHelperService.translatePidsToForcedIds(nextBatchOfPids)); - PersistentIdToForcedIdMap nextBatchOfResourceIds = myTransactionService - .withRequest(null) - .execute(() -> myIdHelperService.translatePidsToForcedIds(nextBatchOfPids)); - - TokenOrListParam idListParam = new TokenOrListParam(); - for (IResourcePersistentId nextPid : nextBatchOfPids) { - Optional resourceId = nextBatchOfResourceIds.get(nextPid); - idListParam.add(resourceId.orElse(nextPid.getId().toString())); - } - - SearchParameterMap spMap = SearchParameterMap.newSynchronous().add(PARAM_ID, idListParam); - IBundleProvider outcome = - dao.search(spMap, new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId)); - resources.addAll(outcome.getAllResources()); + TokenOrListParam idListParam = new TokenOrListParam(); + for (IResourcePersistentId nextPid : nextBatchOfPids) { + Optional resourceId = nextBatchOfResourceIds.get(nextPid); + idListParam.add(resourceId.orElse(nextPid.getId().toString())); } + + SearchParameterMap spMap = SearchParameterMap.newSynchronous().add(PARAM_ID, idListParam); + IBundleProvider outcome = + dao.search(spMap, new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId)); + resources.addAll(outcome.getAllResources()); } return resources; diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java index 16ec8987a1d..e576f9362b4 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java @@ -102,6 +102,8 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker idsToSubmit = new ArrayList<>(); + int estimatedChunkSize = 0; + if (!pidIterator.hasNext()) { ourLog.debug("Bulk Export generated an iterator with no results!"); } @@ -121,17 +123,25 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker 0) { + // Account for comma between array entries + estimatedChunkSize++; + } + estimatedChunkSize += batchResourceId.estimateSerializedSize(); + // Make sure resources stored in each batch does not go over the max capacity - if (idsToSubmit.size() >= myStorageSettings.getBulkExportFileMaximumCapacity()) { - submitWorkChunk(idsToSubmit, resourceType, params, theDataSink); + if (idsToSubmit.size() >= myStorageSettings.getBulkExportFileMaximumCapacity() + || estimatedChunkSize >= myStorageSettings.getBulkExportFileMaximumSize()) { + submitWorkChunk(idsToSubmit, resourceType, theDataSink); submissionCount++; idsToSubmit = new ArrayList<>(); + estimatedChunkSize = 0; } } // if we have any other Ids left, submit them now if (!idsToSubmit.isEmpty()) { - submitWorkChunk(idsToSubmit, resourceType, params, theDataSink); + submitWorkChunk(idsToSubmit, resourceType, theDataSink); submissionCount++; } } @@ -150,7 +160,6 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker theBatchResourceIds, String theResourceType, - BulkExportJobParameters theParams, IJobDataSink theDataSink) { ResourceIdList idList = new ResourceIdList(); diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/models/BatchResourceId.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/models/BatchResourceId.java index 61c3a5daf64..4070d4d1a20 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/models/BatchResourceId.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/models/BatchResourceId.java @@ -22,10 +22,13 @@ package ca.uhn.fhir.batch2.jobs.models; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.annotation.Nonnull; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -public class BatchResourceId implements IModelJson { +import static org.apache.commons.lang3.StringUtils.defaultString; + +public class BatchResourceId implements IModelJson, Comparable { @JsonProperty("type") private String myResourceType; @@ -77,6 +80,24 @@ public class BatchResourceId implements IModelJson { return new HashCodeBuilder(17, 37).append(myResourceType).append(myId).toHashCode(); } + /** + * Returns an estimate of how long the JSON serialized (non-pretty printed) form + * of this object will be. + */ + public int estimateSerializedSize() { + // 19 chars: {"id":"","type":""} + return 19 + defaultString(myId).length() + defaultString(myResourceType).length(); + } + + @Override + public int compareTo(@Nonnull BatchResourceId o) { + int retVal = o.myResourceType.compareTo(myResourceType); + if (retVal == 0) { + retVal = o.myId.compareTo(myId); + } + return retVal; + } + public static BatchResourceId getIdFromPID(IResourcePersistentId thePID, String theResourceType) { BatchResourceId batchResourceId = new BatchResourceId(); batchResourceId.setId(thePID.getId().toString()); diff --git a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourceAndWriteBinaryStepTest.java b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourceAndWriteBinaryStepTest.java index 576da3269aa..48b63d094ec 100644 --- a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourceAndWriteBinaryStepTest.java +++ b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourceAndWriteBinaryStepTest.java @@ -12,6 +12,7 @@ import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.interceptor.executor.InterceptorService; import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; @@ -112,7 +113,7 @@ public class ExpandResourceAndWriteBinaryStepTest { private FhirContext myFhirContext = FhirContext.forR4Cached(); @Spy - private StorageSettings myStorageSettings = new StorageSettings(); + private JpaStorageSettings myStorageSettings = new JpaStorageSettings(); @Spy private IHapiTransactionService myTransactionService = new NonTransactionalHapiTransactionService(); diff --git a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourcesStepTest.java b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourcesStepTest.java index 5ec874649fe..b1ce4c1d63a 100644 --- a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourcesStepTest.java +++ b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/ExpandResourcesStepTest.java @@ -4,14 +4,14 @@ package ca.uhn.fhir.batch2.jobs.export; import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.api.StepExecutionDetails; -import ca.uhn.fhir.interceptor.executor.InterceptorService; -import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList; import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList; import ca.uhn.fhir.batch2.jobs.models.BatchResourceId; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.interceptor.executor.InterceptorService; import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.model.PersistentIdToForcedIdMap; @@ -20,8 +20,8 @@ import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService; import ca.uhn.fhir.jpa.model.dao.JpaPid; -import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.rest.api.server.SystemRequestDetails; +import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; import ca.uhn.fhir.rest.server.SimpleBundleProvider; import ca.uhn.fhir.rest.server.interceptor.ResponseTerminologyTranslationSvc; @@ -75,7 +75,7 @@ public class ExpandResourcesStepTest { private FhirContext myFhirContext = FhirContext.forR4Cached(); @Spy - private StorageSettings myStorageSettings = new StorageSettings(); + private JpaStorageSettings myStorageSettings = new JpaStorageSettings(); @Spy private IHapiTransactionService myTransactionService = new NonTransactionalHapiTransactionService(); diff --git a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStepTest.java b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStepTest.java index da80c5bea69..14e717d7ebd 100644 --- a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStepTest.java +++ b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStepTest.java @@ -130,6 +130,7 @@ public class FetchResourceIdsStepTest { .thenReturn(observationIds.iterator()); int maxFileCapacity = 1000; when(myStorageSettings.getBulkExportFileMaximumCapacity()).thenReturn(maxFileCapacity); + when(myStorageSettings.getBulkExportFileMaximumSize()).thenReturn(10000L); // test RunOutcome outcome = myFirstStep.run(input, sink); @@ -191,6 +192,7 @@ public class FetchResourceIdsStepTest { // when int maxFileCapacity = 5; when(myStorageSettings.getBulkExportFileMaximumCapacity()).thenReturn(maxFileCapacity); + when(myStorageSettings.getBulkExportFileMaximumSize()).thenReturn(10000L); for (int i = 0; i <= maxFileCapacity; i++) { JpaPid id = JpaPid.fromId((long) i); diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/config/JpaStorageSettings.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/config/JpaStorageSettings.java index 2c86b83a33c..294b72da983 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/config/JpaStorageSettings.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/config/JpaStorageSettings.java @@ -49,6 +49,10 @@ import java.util.TreeSet; @SuppressWarnings("JavadocLinkAsPlainText") public class JpaStorageSettings extends StorageSettings { + /** + * Default value for {@link #getBulkExportFileMaximumSize()}: 100 MB + */ + public static final long DEFAULT_BULK_EXPORT_MAXIMUM_WORK_CHUNK_SIZE = 100 * FileUtils.ONE_MB; /** * Default value for {@link #setReuseCachedSearchResultsForMillis(Long)}: 60000ms (one minute) */ @@ -313,6 +317,10 @@ public class JpaStorageSettings extends StorageSettings { * Since 6.2.0 */ private int myBulkExportFileMaximumCapacity = DEFAULT_BULK_EXPORT_FILE_MAXIMUM_CAPACITY; + /** + * Since 7.2.0 + */ + private long myBulkExportFileMaximumSize = DEFAULT_BULK_EXPORT_MAXIMUM_WORK_CHUNK_SIZE; /** * Since 6.4.0 */ @@ -2301,11 +2309,42 @@ public class JpaStorageSettings extends StorageSettings { * Default is 1000 resources per file. * * @since 6.2.0 + * @see #setBulkExportFileMaximumCapacity(int) */ public void setBulkExportFileMaximumCapacity(int theBulkExportFileMaximumCapacity) { myBulkExportFileMaximumCapacity = theBulkExportFileMaximumCapacity; } + /** + * Defines the maximum size for a single work chunk or report file to be held in + * memory or stored in the database for bulk export jobs. + * Note that the framework will attempt to not exceed this limit, but will only + * estimate the actual chunk size as it works, so this value should be set + * below any hard limits that may be present. + * + * @since 7.2.0 + * @see #DEFAULT_BULK_EXPORT_MAXIMUM_WORK_CHUNK_SIZE The default value for this setting + */ + public long getBulkExportFileMaximumSize() { + return myBulkExportFileMaximumSize; + } + + /** + * Defines the maximum size for a single work chunk or report file to be held in + * memory or stored in the database for bulk export jobs. Default is 100 MB. + * Note that the framework will attempt to not exceed this limit, but will only + * estimate the actual chunk size as it works, so this value should be set + * below any hard limits that may be present. + * + * @since 7.2.0 + * @see #setBulkExportFileMaximumCapacity(int) + * @see #DEFAULT_BULK_EXPORT_MAXIMUM_WORK_CHUNK_SIZE The default value for this setting + */ + public void setBulkExportFileMaximumSize(long theBulkExportFileMaximumSize) { + Validate.isTrue(theBulkExportFileMaximumSize > 0, "theBulkExportFileMaximumSize must be positive"); + myBulkExportFileMaximumSize = theBulkExportFileMaximumSize; + } + /** * If this setting is enabled, then gated batch jobs that produce only one chunk will immediately trigger a batch * maintenance job. This may be useful for testing, but is not recommended for production use.