From ec8aad3e78dc739f78ada9bd1beb53f28dcdd428 Mon Sep 17 00:00:00 2001 From: Martha Date: Thu, 27 Jun 2024 15:56:09 -0700 Subject: [PATCH] Cleanup and enhance batch2 jobs to support running them across multiple partitions. --- .../ca/uhn/fhir/i18n/hapi-messages.properties | 2 - .../jpa/dao/r4/BasePartitioningR4Test.java | 4 +- .../jpa/dao/r4/PartitioningSqlR4Test.java | 1 + .../uhn/fhir/jpa/dao/tx/ReindexStepTest.java | 7 +- .../jpa/delete/job/DeleteExpungeJobTest.java | 5 +- .../fhir/jpa/delete/job/ReindexJobTest.java | 23 ++ .../job/ReindexJobWithPartitioningTest.java | 100 ++++++++ .../PartitioningInterceptorR4Test.java | 2 +- .../RequestTenantPartitionInterceptor.java | 5 + .../server/provider/ProviderConstants.java | 1 + .../jobs/expunge/DeleteExpungeAppCtx.java | 4 +- .../expunge/DeleteExpungeJobParameters.java | 4 +- .../DeleteExpungeJobParametersValidator.java | 4 +- .../jobs/imprt/BulkDataImportProvider.java | 7 +- .../batch2/jobs/reindex/ReindexAppCtx.java | 15 +- .../jobs/reindex/ReindexJobParameters.java | 4 +- .../batch2/jobs/reindex/ReindexProvider.java | 16 +- .../fhir/batch2/jobs/reindex/ReindexStep.java | 2 +- .../ReindexJobParametersValidatorTest.java | 4 +- .../jobs/reindex/ReindexProviderTest.java | 20 +- .../batch2/api/IJobPartitionProvider.java | 24 ++ .../fhir/batch2/config/BaseBatch2Config.java | 8 + .../coordinator/JobPartitionProvider.java | 25 ++ .../batch2/jobs/chunk/ChunkRangeJson.java | 54 +++- .../chunk/PartitionedUrlChunkRangeJson.java | 39 --- .../batch2/jobs/parameters/JobParameters.java | 116 +++++++++ .../parameters/PartitionedJobParameters.java | 53 ---- .../jobs/parameters/PartitionedUrl.java | 6 +- .../PartitionedUrlListJobParameters.java | 53 ---- .../fhir/batch2/jobs/step/ChunkProducer.java | 50 ++++ .../jobs/step/GenerateRangeChunksStep.java | 92 +++++-- .../batch2/jobs/step/IIdChunkProducer.java | 13 +- .../fhir/batch2/jobs/step/LoadIdsStep.java | 15 +- .../PartitionedUrlListIdChunkProducer.java | 67 ----- .../batch2/jobs/step/ResourceIdListStep.java | 26 +- .../batch2/model/JobInstanceStartRequest.java | 6 + .../coordinator/JobPartitionProviderTest.java | 42 ++++ .../batch2/jobs/step/ChunkProducerTest.java | 53 ++++ .../step/GenerateRangeChunksStepTest.java | 140 +++++++++++ .../batch2/jobs/step/LoadIdsStepTest.java | 25 +- .../jobs/step/ResourceIdListStepTest.java | 29 ++- hapi-fhir-storage-mdm/pom.xml | 2 +- .../fhir/mdm/batch2/LoadGoldenIdsStep.java | 9 +- .../uhn/fhir/mdm/batch2/MdmBatch2Config.java | 3 +- .../fhir/mdm/batch2/MdmChunkRangeJson.java | 40 --- .../batch2/MdmGenerateRangeChunksStep.java | 13 +- .../fhir/mdm/batch2/MdmIdChunkProducer.java | 16 +- .../fhir/mdm/batch2/clear/MdmClearAppCtx.java | 4 +- .../batch2/clear/MdmClearJobParameters.java | 4 +- .../fhir/mdm/batch2/clear/MdmClearStep.java | 3 +- .../mdm/batch2/submit/MdmSubmitAppCtx.java | 14 +- .../batch2/submit/MdmSubmitJobParameters.java | 4 +- .../MdmSubmitJobParametersValidator.java | 16 +- .../BaseRequestPartitionHelperSvc.java | 232 ++++++++++-------- 54 files changed, 992 insertions(+), 534 deletions(-) create mode 100644 hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobWithPartitioningTest.java create mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPartitionProvider.java create mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobPartitionProvider.java delete mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/PartitionedUrlChunkRangeJson.java create mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/JobParameters.java delete mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedJobParameters.java delete mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedUrlListJobParameters.java create mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/ChunkProducer.java delete mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/PartitionedUrlListIdChunkProducer.java create mode 100644 hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobPartitionProviderTest.java create mode 100644 hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/ChunkProducerTest.java create mode 100644 hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStepTest.java delete mode 100644 hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/MdmChunkRangeJson.java diff --git a/hapi-fhir-base/src/main/resources/ca/uhn/fhir/i18n/hapi-messages.properties b/hapi-fhir-base/src/main/resources/ca/uhn/fhir/i18n/hapi-messages.properties index ef31da44191..686f1452d6d 100644 --- a/hapi-fhir-base/src/main/resources/ca/uhn/fhir/i18n/hapi-messages.properties +++ b/hapi-fhir-base/src/main/resources/ca/uhn/fhir/i18n/hapi-messages.properties @@ -204,8 +204,6 @@ ca.uhn.fhir.jpa.partition.PartitionLookupSvcImpl.invalidName=Partition name "{0} ca.uhn.fhir.jpa.partition.PartitionLookupSvcImpl.cantCreateDuplicatePartitionName=Partition name "{0}" is already defined ca.uhn.fhir.jpa.partition.PartitionLookupSvcImpl.cantCreateDefaultPartition=Can not create partition with name "DEFAULT" -ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInterceptor.unknownTenantName=Unknown tenant: {0} - ca.uhn.fhir.jpa.dao.HistoryBuilder.noSystemOrTypeHistoryForPartitionAwareServer=Type- and Server- level history operation not supported across partitions on partitioned server ca.uhn.fhir.jpa.provider.DiffProvider.cantDiffDifferentTypes=Unable to diff two resources of different types diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/BasePartitioningR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/BasePartitioningR4Test.java index 93710e4f63c..1507e9487ed 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/BasePartitioningR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/BasePartitioningR4Test.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import ca.uhn.fhir.interceptor.api.Hook; import ca.uhn.fhir.interceptor.api.Interceptor; import ca.uhn.fhir.interceptor.api.Pointcut; +import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.entity.PartitionEntity; @@ -277,7 +278,8 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest { } @Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ) - public RequestPartitionId partitionIdentifyRead(ServletRequestDetails theRequestDetails) { + public RequestPartitionId partitionIdentifyRead(ServletRequestDetails theRequestDetails, + ReadPartitionIdRequestDetails theDetails) { // Just to be nice, figure out the first line in the stack that isn't a part of the // partitioning or interceptor infrastructure, just so it's obvious who is asking diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java index 69b55c35813..76b738d588d 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java @@ -2883,6 +2883,7 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test { createPatient(withPartition(1), withActiveTrue()); myCaptureQueriesListener.clear(); + addReadDefaultPartition(); Bundle outcome = mySystemDao.transaction(mySrd, input.get()); ourLog.debug("Resp: {}", myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(outcome)); myCaptureQueriesListener.logSelectQueriesForCurrentThread(); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepTest.java index abb988e04bf..bc178d513a4 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepTest.java @@ -15,6 +15,8 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.util.List; + import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; @@ -40,9 +42,10 @@ public class ReindexStepTest { public void testMethodReindex_withRequestPartitionId_willExecuteWithPartitionId(){ // given Integer expectedPartitionId = 1; - ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(); + RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(expectedPartitionId); + ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(List.of(), partitionId); ReindexJobParameters reindexJobParameters = new ReindexJobParameters(); - reindexJobParameters.setRequestPartitionId(RequestPartitionId.fromPartitionId(expectedPartitionId)); + reindexJobParameters.setRequestPartitionId(partitionId); when(myHapiTransactionService.withRequest(any())).thenCallRealMethod(); when(myHapiTransactionService.buildExecutionBuilder(any())).thenCallRealMethod(); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/DeleteExpungeJobTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/DeleteExpungeJobTest.java index f6e792c0eaa..d70ff1085e9 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/DeleteExpungeJobTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/DeleteExpungeJobTest.java @@ -20,8 +20,8 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; public class DeleteExpungeJobTest extends BaseJpaR4Test { @Autowired @@ -62,7 +62,8 @@ public class DeleteExpungeJobTest extends BaseJpaR4Test { assertEquals(2, myDiagnosticReportDao.search(SearchParameterMap.newSynchronous()).size()); DeleteExpungeJobParameters jobParameters = new DeleteExpungeJobParameters(); - jobParameters.addUrl("Observation?subject.active=false").addUrl("DiagnosticReport?subject.active=false"); + jobParameters.addUrl("Observation?subject.active=false"); + jobParameters.addUrl("DiagnosticReport?subject.active=false"); JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); startRequest.setParameters(jobParameters); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java index 4cbdcdd56c4..1936483e496 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java @@ -311,6 +311,29 @@ public class ReindexJobTest extends BaseJpaR4Test { myStorageSettings.setMarkResourcesForReindexingUponSearchParameterChange(reindexPropertyCache); } + @Test + public void testReindex_byMultipleUrls_indexesMatchingResources() { + // setup + createObservation(withStatus(Observation.ObservationStatus.FINAL.toCode())); + createObservation(withStatus(Observation.ObservationStatus.CANCELLED.toCode())); + createPatient(withActiveTrue()); + createPatient(withActiveFalse()); + + // Only reindex one of them + ReindexJobParameters parameters = new ReindexJobParameters(); + parameters.addUrl("Observation?status=final"); + parameters.addUrl("Patient?"); + + // execute + JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); + startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setParameters(parameters); + Batch2JobStartResponse res = myJobCoordinator.startInstance(startRequest); + JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res); + + assertThat(jobInstance.getCombinedRecordsProcessed()).isEqualTo(3); + } + @Test public void testReindexDeletedResources_byUrl_willRemoveDeletedResourceEntriesFromIndexTables(){ IIdType obsId = myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.FINAL); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobWithPartitioningTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobWithPartitioningTest.java new file mode 100644 index 00000000000..311641f0a06 --- /dev/null +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobWithPartitioningTest.java @@ -0,0 +1,100 @@ +package ca.uhn.fhir.jpa.delete.job; + +import ca.uhn.fhir.batch2.api.IJobCoordinator; +import ca.uhn.fhir.batch2.jobs.parameters.JobParameters; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx; +import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; +import ca.uhn.fhir.jpa.entity.PartitionEntity; +import ca.uhn.fhir.jpa.model.config.PartitionSettings; +import ca.uhn.fhir.jpa.test.BaseJpaR4Test; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; +import ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInterceptor; +import org.hl7.fhir.r4.model.Observation; +import org.hl7.fhir.r4.model.Patient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.List; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class ReindexJobWithPartitioningTest extends BaseJpaR4Test { + @Autowired + private IJobCoordinator myJobCoordinator; + private final RequestTenantPartitionInterceptor myPartitionInterceptor = new RequestTenantPartitionInterceptor(); + + @BeforeEach + public void before() { + myInterceptorRegistry.registerInterceptor(myPartitionInterceptor); + myPartitionSettings.setPartitioningEnabled(true); + + myPartitionConfigSvc.createPartition(new PartitionEntity().setId(1).setName("TestPartition1"), null); + myPartitionConfigSvc.createPartition(new PartitionEntity().setId(2).setName("TestPartition2"), null); + + RequestPartitionId partition1 = RequestPartitionId.fromPartitionId(1); + RequestPartitionId partition2 = RequestPartitionId.fromPartitionId(2); + + Observation observation1 = buildResource("Observation", withStatus(Observation.ObservationStatus.FINAL.toCode())); + myObservationDao.create(observation1, new SystemRequestDetails().setRequestPartitionId(partition1)); + Observation observation2 = buildResource("Observation", withStatus(Observation.ObservationStatus.REGISTERED.toCode())); + myObservationDao.create(observation2, new SystemRequestDetails().setRequestPartitionId(partition1)); + Observation observation3 = buildResource("Observation", withStatus(Observation.ObservationStatus.FINAL.toCode())); + myObservationDao.create(observation3, new SystemRequestDetails().setRequestPartitionId(partition2)); + + Patient patient1 = buildResource("Patient", withActiveTrue()); + myPatientDao.create(patient1, new SystemRequestDetails().setRequestPartitionId(partition1)); + Patient patient2 = buildResource("Patient", withActiveFalse()); + myPatientDao.create(patient2, new SystemRequestDetails().setRequestPartitionId(partition2)); + } + + @AfterEach + public void after() { + myInterceptorRegistry.unregisterInterceptor(myPartitionInterceptor); + myPartitionSettings.setPartitioningEnabled(new PartitionSettings().isPartitioningEnabled()); + } + + public static Stream getReindexParameters() { + List allPartitions = List.of(RequestPartitionId.fromPartitionId(1), RequestPartitionId.fromPartitionId(2)); + List partition1 = List.of(RequestPartitionId.fromPartitionId(1)); + return Stream.of( + Arguments.of(List.of(), List.of(), false, 5), + Arguments.of(List.of("Observation?"), allPartitions, false, 3), + Arguments.of(List.of("Observation?"), List.of(), false, 0), + Arguments.of(List.of("Observation?"), partition1, true, 2), + Arguments.of(List.of("Observation?", "Patient?"), allPartitions, false, 5), + Arguments.of(List.of("Observation?", "Patient?"), allPartitions, true, 3), + Arguments.of(List.of("Observation?status=final", "Patient?"), allPartitions, false, 4), + Arguments.of(List.of("Observation?status=final", "Patient?"), allPartitions, true, 2), + Arguments.of(List.of("Observation?status=final", "Patient?"), partition1, false, 2) + ); + } + + @ParameterizedTest + @MethodSource(value = "getReindexParameters") + public void testReindex_byMultipleUrlsAndPartitions_indexesMatchingResources(List theUrls, + List thePartitions, + boolean theShouldAssignPartitionToUrl, + int theExpectedIndexedResourceCount) { + + JobParameters parameters = JobParameters.from(theUrls, thePartitions, theShouldAssignPartitionToUrl); + + // execute + JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); + startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setParameters(parameters); + Batch2JobStartResponse res = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest); + JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res); + + // verify only resources matching URLs and partitions provided via parameters were re-indexed + assertThat(jobInstance.getCombinedRecordsProcessed()).isEqualTo(theExpectedIndexedResourceCount); + } +} diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/interceptor/PartitioningInterceptorR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/interceptor/PartitioningInterceptorR4Test.java index 95256cf8f8a..5a59774055b 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/interceptor/PartitioningInterceptorR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/interceptor/PartitioningInterceptorR4Test.java @@ -200,7 +200,7 @@ public class PartitioningInterceptorR4Test extends BaseJpaR4SystemTest { myPatientDao.search(map); fail(); } catch (InternalErrorException e) { - assertEquals(Msg.code(1319) + "No interceptor provided a value for pointcut: STORAGE_PARTITION_IDENTIFY_READ", e.getMessage()); + assertEquals(Msg.code(1319) + "No interceptor provided a value for pointcuts: [STORAGE_PARTITION_IDENTIFY_ANY, STORAGE_PARTITION_IDENTIFY_READ]", e.getMessage()); } } diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/partition/RequestTenantPartitionInterceptor.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/partition/RequestTenantPartitionInterceptor.java index d9edd76ac15..4791575c1cc 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/partition/RequestTenantPartitionInterceptor.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/partition/RequestTenantPartitionInterceptor.java @@ -27,6 +27,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; +import ca.uhn.fhir.rest.server.provider.ProviderConstants; import ca.uhn.fhir.rest.server.tenant.ITenantIdentificationStrategy; import jakarta.annotation.Nonnull; @@ -57,6 +58,7 @@ public class RequestTenantPartitionInterceptor { // We will use the tenant ID that came from the request as the partition name String tenantId = theRequestDetails.getTenantId(); if (isBlank(tenantId)) { + // this branch is no-op happen when "partitioning.tenant_identification_strategy" is set to URL_BASED if (theRequestDetails instanceof SystemRequestDetails) { SystemRequestDetails requestDetails = (SystemRequestDetails) theRequestDetails; if (requestDetails.getRequestPartitionId() != null) { @@ -67,6 +69,9 @@ public class RequestTenantPartitionInterceptor { throw new InternalErrorException(Msg.code(343) + "No partition ID has been specified"); } + if (tenantId.equals(ProviderConstants.ALL_PARTITIONS_NAME)) { + return RequestPartitionId.allPartitions(); + } return RequestPartitionId.fromPartitionName(tenantId); } } diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/provider/ProviderConstants.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/provider/ProviderConstants.java index 114e2ecdd0b..d0972c6a338 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/provider/ProviderConstants.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/provider/ProviderConstants.java @@ -53,6 +53,7 @@ public class ProviderConstants { public static final String PARTITION_MANAGEMENT_PARTITION_DESC = "description"; public static final String DEFAULT_PARTITION_NAME = "DEFAULT"; + public static final String ALL_PARTITIONS_NAME = "_ALL"; /** * Operation name: diff diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/expunge/DeleteExpungeAppCtx.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/expunge/DeleteExpungeAppCtx.java index aa0a03ef6a9..d785f0a2da2 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/expunge/DeleteExpungeAppCtx.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/expunge/DeleteExpungeAppCtx.java @@ -19,7 +19,7 @@ */ package ca.uhn.fhir.batch2.jobs.expunge; -import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson; +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator; import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep; @@ -59,7 +59,7 @@ public class DeleteExpungeAppCtx { .addFirstStep( "generate-ranges", "Generate data ranges to expunge", - PartitionedUrlChunkRangeJson.class, + ChunkRangeJson.class, expungeGenerateRangeChunksStep()) .addIntermediateStep( "load-ids", diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/expunge/DeleteExpungeJobParameters.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/expunge/DeleteExpungeJobParameters.java index ebed8801712..fd57f9c78f7 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/expunge/DeleteExpungeJobParameters.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/expunge/DeleteExpungeJobParameters.java @@ -19,10 +19,10 @@ */ package ca.uhn.fhir.batch2.jobs.expunge; -import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters; +import ca.uhn.fhir.batch2.jobs.parameters.JobParameters; import com.fasterxml.jackson.annotation.JsonProperty; -public class DeleteExpungeJobParameters extends PartitionedUrlListJobParameters { +public class DeleteExpungeJobParameters extends JobParameters { @JsonProperty("cascade") private boolean myCascade; diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/expunge/DeleteExpungeJobParametersValidator.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/expunge/DeleteExpungeJobParametersValidator.java index fcb3381b6a8..e45003b5343 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/expunge/DeleteExpungeJobParametersValidator.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/expunge/DeleteExpungeJobParametersValidator.java @@ -26,10 +26,10 @@ import ca.uhn.fhir.jpa.api.svc.IDeleteExpungeSvc; import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.util.ValidateUtil; -import jakarta.annotation.Nonnull; -import jakarta.annotation.Nullable; import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; public class DeleteExpungeJobParametersValidator implements IJobParametersValidator { private final IUrlListValidator myUrlListValidator; diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/imprt/BulkDataImportProvider.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/imprt/BulkDataImportProvider.java index ca1b4b3032f..d11df7d5dbe 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/imprt/BulkDataImportProvider.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/imprt/BulkDataImportProvider.java @@ -159,10 +159,9 @@ public class BulkDataImportProvider { RequestPartitionId partitionId = myRequestPartitionHelperService.determineReadPartitionForRequestForServerOperation( theRequestDetails, JpaConstants.OPERATION_IMPORT); - if (!partitionId.isAllPartitions()) { - myRequestPartitionHelperService.validateHasPartitionPermissions(theRequestDetails, "Binary", partitionId); - jobParameters.setPartitionId(partitionId); - } + // TODO MM: I believe this is already checked as part of + myRequestPartitionHelperService.validateHasPartitionPermissions(theRequestDetails, "Binary", partitionId); + jobParameters.setPartitionId(partitionId); // Extract all the URLs and order them in the order that is least // likely to result in conflict (e.g. Patients before Observations diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java index 50089c5afff..b43271b1676 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java @@ -20,9 +20,10 @@ package ca.uhn.fhir.batch2.jobs.reindex; import ca.uhn.fhir.batch2.api.IJobCoordinator; +import ca.uhn.fhir.batch2.api.IJobPartitionProvider; import ca.uhn.fhir.batch2.api.IJobStepWorker; import ca.uhn.fhir.batch2.api.VoidModel; -import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson; +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator; import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner; @@ -31,7 +32,6 @@ import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; -import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; import ca.uhn.fhir.rest.server.provider.ProviderConstants; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -53,7 +53,7 @@ public class ReindexAppCtx { .addFirstStep( "generate-ranges", "Generate data ranges to reindex", - PartitionedUrlChunkRangeJson.class, + ChunkRangeJson.class, reindexGenerateRangeChunksStep()) .addIntermediateStep( "load-ids", @@ -65,13 +65,12 @@ public class ReindexAppCtx { } @Bean - public IJobStepWorker - reindexGenerateRangeChunksStep() { + public IJobStepWorker reindexGenerateRangeChunksStep() { return new GenerateRangeChunksStep<>(); } @Bean - public IJobStepWorker loadIdsStep( + public IJobStepWorker loadIdsStep( IBatch2DaoSvc theBatch2DaoSvc) { return new LoadIdsStep<>(theBatch2DaoSvc); } @@ -91,8 +90,8 @@ public class ReindexAppCtx { public ReindexProvider reindexProvider( FhirContext theFhirContext, IJobCoordinator theJobCoordinator, - IRequestPartitionHelperSvc theRequestPartitionHelperSvc, + IJobPartitionProvider theJobPartitionHandler, UrlPartitioner theUrlPartitioner) { - return new ReindexProvider(theFhirContext, theJobCoordinator, theRequestPartitionHelperSvc, theUrlPartitioner); + return new ReindexProvider(theFhirContext, theJobCoordinator, theJobPartitionHandler, theUrlPartitioner); } } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexJobParameters.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexJobParameters.java index cdc1830df5f..2a5f4131888 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexJobParameters.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexJobParameters.java @@ -19,14 +19,14 @@ */ package ca.uhn.fhir.batch2.jobs.reindex; -import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters; +import ca.uhn.fhir.batch2.jobs.parameters.JobParameters; import ca.uhn.fhir.jpa.api.dao.ReindexParameters; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.annotation.Nullable; import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; -public class ReindexJobParameters extends PartitionedUrlListJobParameters { +public class ReindexJobParameters extends JobParameters { public static final String OPTIMIZE_STORAGE = "optimizeStorage"; public static final String REINDEX_SEARCH_PARAMETERS = "reindexSearchParameters"; diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexProvider.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexProvider.java index 1c843474a35..6677c52e067 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexProvider.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexProvider.java @@ -20,13 +20,12 @@ package ca.uhn.fhir.batch2.jobs.reindex; import ca.uhn.fhir.batch2.api.IJobCoordinator; +import ca.uhn.fhir.batch2.api.IJobPartitionProvider; import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.context.FhirContext; -import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.dao.ReindexParameters; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; -import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; import ca.uhn.fhir.model.api.annotation.Description; import ca.uhn.fhir.rest.annotation.Operation; import ca.uhn.fhir.rest.annotation.OperationParam; @@ -50,7 +49,7 @@ public class ReindexProvider { private final FhirContext myFhirContext; private final IJobCoordinator myJobCoordinator; - private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc; + private final IJobPartitionProvider myJobPartitionProvider; private final UrlPartitioner myUrlPartitioner; /** @@ -59,11 +58,11 @@ public class ReindexProvider { public ReindexProvider( FhirContext theFhirContext, IJobCoordinator theJobCoordinator, - IRequestPartitionHelperSvc theRequestPartitionHelperSvc, + IJobPartitionProvider theJobPartitionProvider, UrlPartitioner theUrlPartitioner) { myFhirContext = theFhirContext; myJobCoordinator = theJobCoordinator; - myRequestPartitionHelperSvc = theRequestPartitionHelperSvc; + myJobPartitionProvider = theJobPartitionProvider; myUrlPartitioner = theUrlPartitioner; } @@ -128,10 +127,9 @@ public class ReindexProvider { .forEach(params::addPartitionedUrl); } - RequestPartitionId requestPartition = - myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation( - theRequestDetails, ProviderConstants.OPERATION_REINDEX); - params.setRequestPartitionId(requestPartition); + myJobPartitionProvider + .getPartitions(theRequestDetails, ProviderConstants.OPERATION_REINDEX) + .forEach(params::addRequestPartitionId); JobInstanceStartRequest request = new JobInstanceStartRequest(); request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java index 29ad0028c92..2644f3af89f 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java @@ -103,7 +103,7 @@ public class ReindexStep implements IJobStepWorker errors = myValidator.validate(null, parameters); - - return errors; + return myValidator.validate(null, parameters); } } diff --git a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexProviderTest.java b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexProviderTest.java index c79ea92895f..30185eea738 100644 --- a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexProviderTest.java +++ b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexProviderTest.java @@ -1,9 +1,7 @@ package ca.uhn.fhir.batch2.jobs.reindex; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertFalse; import ca.uhn.fhir.batch2.api.IJobCoordinator; +import ca.uhn.fhir.batch2.api.IJobPartitionProvider; import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; @@ -33,9 +31,15 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNotNull; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -59,6 +63,8 @@ public class ReindexProviderTest { private IRequestPartitionHelperSvc myRequestPartitionHelperSvc; @Mock private UrlPartitioner myUrlPartitioner; + @Mock + private IJobPartitionProvider myJobPartitionProvider; @Captor private ArgumentCaptor myStartRequestCaptor; @@ -72,7 +78,7 @@ public class ReindexProviderTest { when(myJobCoordinator.startInstance(isNotNull(), any())) .thenReturn(createJobStartResponse()); - when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(any(), any())).thenReturn(RequestPartitionId.allPartitions()); + when(myJobPartitionProvider.getPartitions(any(), any())).thenReturn(List.of(RequestPartitionId.allPartitions())); } private Batch2JobStartResponse createJobStartResponse() { @@ -96,11 +102,7 @@ public class ReindexProviderTest { input.addParameter(ProviderConstants.OPERATION_REINDEX_PARAM_BATCH_SIZE, new DecimalType(batchSize)); ourLog.debug(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input)); - - PartitionedUrl partitionedUrl = new PartitionedUrl(); - partitionedUrl.setUrl(url); - partitionedUrl.setRequestPartitionId(RequestPartitionId.defaultPartition()); - when(myUrlPartitioner.partitionUrl(anyString(), any())).thenReturn(partitionedUrl); + when(myUrlPartitioner.partitionUrl(anyString(), any())).thenReturn(new PartitionedUrl().setUrl(url).setRequestPartitionId(RequestPartitionId.defaultPartition())); // Execute diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPartitionProvider.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPartitionProvider.java new file mode 100644 index 00000000000..3bd0a6bb2ee --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPartitionProvider.java @@ -0,0 +1,24 @@ +package ca.uhn.fhir.batch2.api; + +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.rest.api.server.RequestDetails; + +import java.util.List; + +/** + * Provides the list of partitions that a job should run against. + * TODO MM: Consider moving UrlPartitioner calls to this class once other operations need to be MegaScale enabled. + * That way all partitioning related logic exists only here for batch jobs. + * After that PartitionedUrl#myRequestPartitionId can be marked as deprecated. + */ +public interface IJobPartitionProvider { + /** + * Provides the list of partitions to run job steps against, based on the request that initiates the job. + * @param theRequestDetails the requestDetails + * @param theOperation the operation being run which corresponds to the job + * @return the list of partitions + */ + List getPartitions(RequestDetails theRequestDetails, String theOperation); + + // List getPartitions(RequestDetails theRequestDetails, String theOperation, String theUrls); +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java index d2bff951f83..d403ee0f287 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java @@ -21,11 +21,13 @@ package ca.uhn.fhir.batch2.config; import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobMaintenanceService; +import ca.uhn.fhir.batch2.api.IJobPartitionProvider; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.IReductionStepExecutorService; import ca.uhn.fhir.batch2.channel.BatchJobSender; import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl; import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; +import ca.uhn.fhir.batch2.coordinator.JobPartitionProvider; import ca.uhn.fhir.batch2.coordinator.ReductionStepExecutorServiceImpl; import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor; import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl; @@ -33,6 +35,7 @@ import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; +import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; @@ -139,4 +142,9 @@ public abstract class BaseBatch2Config { protected int getConcurrentConsumers() { return 4; } + + @Bean + public IJobPartitionProvider jobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) { + return new JobPartitionProvider(theRequestPartitionHelperSvc); + } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobPartitionProvider.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobPartitionProvider.java new file mode 100644 index 00000000000..88dc0cfc4a1 --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobPartitionProvider.java @@ -0,0 +1,25 @@ +package ca.uhn.fhir.batch2.coordinator; + +import ca.uhn.fhir.batch2.api.IJobPartitionProvider; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; +import ca.uhn.fhir.rest.api.server.RequestDetails; + +import java.util.List; + +/** + * The default implementation, which uses {@link IRequestPartitionHelperSvc} to compute the partition to run a batch2 job. + */ +public class JobPartitionProvider implements IJobPartitionProvider { + private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc; + + public JobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) { + myRequestPartitionHelperSvc = theRequestPartitionHelperSvc; + } + + @Override + public List getPartitions(RequestDetails theRequestDetails, String theOperation) { + return List.of(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation( + theRequestDetails, theOperation)); + } +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/ChunkRangeJson.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/ChunkRangeJson.java index 7e9fa85b947..e162f6756da 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/ChunkRangeJson.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/ChunkRangeJson.java @@ -19,6 +19,7 @@ */ package ca.uhn.fhir.batch2.jobs.chunk; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.rest.server.util.JsonDateDeserializer; import ca.uhn.fhir.rest.server.util.JsonDateSerializer; @@ -26,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; import java.util.Date; @@ -42,23 +44,61 @@ public class ChunkRangeJson implements IModelJson { @Nonnull private Date myEnd; + @Nullable + @JsonProperty("url") + private String myUrl; + + @JsonProperty("resourceType") + private String myResourceType; + + @Nullable + @JsonProperty("partitionId") + private RequestPartitionId myPartitionId; + + public ChunkRangeJson() {} + + public ChunkRangeJson(@Nonnull Date theStart, @Nonnull Date theEnd) { + this.myStart = theStart; + this.myEnd = theEnd; + } + @Nonnull public Date getStart() { return myStart; } - public ChunkRangeJson setStart(@Nonnull Date theStart) { - myStart = theStart; - return this; - } - @Nonnull public Date getEnd() { return myEnd; } - public ChunkRangeJson setEnd(@Nonnull Date theEnd) { - myEnd = theEnd; + @Nullable + public String getUrl() { + return myUrl; + } + + public ChunkRangeJson setUrl(@Nullable String theUrl) { + myUrl = theUrl; + return this; + } + + @Nonnull + public String getResourceType() { + return myResourceType; + } + + public ChunkRangeJson setResourceType(@Nullable String theResourceType) { + myResourceType = theResourceType; + return this; + } + + @Nullable + public RequestPartitionId getPartitionId() { + return myPartitionId; + } + + public ChunkRangeJson setPartitionId(@Nullable RequestPartitionId thePartitionId) { + myPartitionId = thePartitionId; return this; } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/PartitionedUrlChunkRangeJson.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/PartitionedUrlChunkRangeJson.java deleted file mode 100644 index ae4e494fa49..00000000000 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/PartitionedUrlChunkRangeJson.java +++ /dev/null @@ -1,39 +0,0 @@ -/*- - * #%L - * HAPI FHIR JPA Server - Batch2 Task Processor - * %% - * Copyright (C) 2014 - 2024 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package ca.uhn.fhir.batch2.jobs.chunk; - -import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; -import com.fasterxml.jackson.annotation.JsonProperty; -import jakarta.annotation.Nullable; - -public class PartitionedUrlChunkRangeJson extends ChunkRangeJson { - @Nullable - @JsonProperty("partitionedUrl") - private PartitionedUrl myPartitionedUrl; - - @Nullable - public PartitionedUrl getPartitionedUrl() { - return myPartitionedUrl; - } - - public void setPartitionedUrl(@Nullable PartitionedUrl thePartitionedUrl) { - myPartitionedUrl = thePartitionedUrl; - } -} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/JobParameters.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/JobParameters.java new file mode 100644 index 00000000000..6334cb4e803 --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/JobParameters.java @@ -0,0 +1,116 @@ +/*- + * #%L + * HAPI FHIR JPA Server - Batch2 Task Processor + * %% + * Copyright (C) 2014 - 2024 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package ca.uhn.fhir.batch2.jobs.parameters; + +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.model.api.IModelJson; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** + * Can be used to configure parameters for batch2 jobs. + * Please note that these need to be backward compatible as we do not have a way to migrate them to a different structure at the moment. + */ +public class JobParameters implements IModelJson { + @JsonProperty(value = "partitionId") + private List myRequestPartitionIds; + + @JsonProperty("batchSize") + private Integer myBatchSize; + + @JsonProperty("partitionedUrl") + private List myPartitionedUrls; + + public void setRequestPartitionId(@Nullable RequestPartitionId theRequestPartitionId) { + if (theRequestPartitionId != null) { + myRequestPartitionIds = List.of(theRequestPartitionId); + } + } + + @Nullable + public RequestPartitionId getRequestPartitionId() { + return getFirstRequestPartitionIdOrNull(); + } + + @Nullable + private RequestPartitionId getFirstRequestPartitionIdOrNull() { + return myRequestPartitionIds == null || myRequestPartitionIds.isEmpty() ? null : myRequestPartitionIds.get(0); + } + + @Nonnull + public List getRequestPartitionIds() { + if (myRequestPartitionIds == null) { + myRequestPartitionIds = new ArrayList<>(); + } + return myRequestPartitionIds; + } + + public void addRequestPartitionId(RequestPartitionId theRequestPartitionId) { + getRequestPartitionIds().add(theRequestPartitionId); + } + + public void setBatchSize(int theBatchSize) { + myBatchSize = theBatchSize; + } + + @Nullable + public Integer getBatchSize() { + return myBatchSize; + } + + public List getPartitionedUrls() { + if (myPartitionedUrls == null) { + myPartitionedUrls = new ArrayList<>(); + } + return myPartitionedUrls; + } + + public void addPartitionedUrl(@Nonnull PartitionedUrl theUrl) { + getPartitionedUrls().add(theUrl); + } + + public void addUrl(@Nonnull String theUrl) { + getPartitionedUrls().add(new PartitionedUrl().setUrl(theUrl)); + } + + @VisibleForTesting + public static JobParameters from( + List theUrls, List thePartitions, boolean theShouldAssignPartitionToUrl) { + JobParameters parameters = new JobParameters(); + if (theShouldAssignPartitionToUrl) { + assert theUrls.size() == thePartitions.size(); + for (int i = 0; i < theUrls.size(); i++) { + PartitionedUrl partitionedUrl = new PartitionedUrl(); + partitionedUrl.setUrl(theUrls.get(i)); + partitionedUrl.setRequestPartitionId(thePartitions.get(i)); + parameters.addPartitionedUrl(partitionedUrl); + } + } else { + theUrls.forEach(url -> parameters.addPartitionedUrl(new PartitionedUrl().setUrl(url))); + thePartitions.forEach(parameters::addRequestPartitionId); + } + return parameters; + } +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedJobParameters.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedJobParameters.java deleted file mode 100644 index 520793fe02a..00000000000 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedJobParameters.java +++ /dev/null @@ -1,53 +0,0 @@ -/*- - * #%L - * HAPI FHIR JPA Server - Batch2 Task Processor - * %% - * Copyright (C) 2014 - 2024 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package ca.uhn.fhir.batch2.jobs.parameters; - -import ca.uhn.fhir.interceptor.model.RequestPartitionId; -import ca.uhn.fhir.model.api.IModelJson; -import com.fasterxml.jackson.annotation.JsonProperty; -import jakarta.annotation.Nullable; - -public class PartitionedJobParameters implements IModelJson { - @JsonProperty(value = "partitionId") - @Nullable - private RequestPartitionId myRequestPartitionId; - - @JsonProperty("batchSize") - @Nullable - private Integer myBatchSize; - - @Nullable - public RequestPartitionId getRequestPartitionId() { - return myRequestPartitionId; - } - - public void setRequestPartitionId(@Nullable RequestPartitionId theRequestPartitionId) { - myRequestPartitionId = theRequestPartitionId; - } - - public void setBatchSize(int theBatchSize) { - myBatchSize = theBatchSize; - } - - @Nullable - public Integer getBatchSize() { - return myBatchSize; - } -} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedUrl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedUrl.java index ea3ba4ffb0b..b29f215344e 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedUrl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedUrl.java @@ -48,15 +48,17 @@ public class PartitionedUrl implements IModelJson { return myUrl; } - public void setUrl(String theUrl) { + public PartitionedUrl setUrl(String theUrl) { myUrl = theUrl; + return this; } public RequestPartitionId getRequestPartitionId() { return myRequestPartitionId; } - public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) { + public PartitionedUrl setRequestPartitionId(RequestPartitionId theRequestPartitionId) { myRequestPartitionId = theRequestPartitionId; + return this; } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedUrlListJobParameters.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedUrlListJobParameters.java deleted file mode 100644 index f8677731c9f..00000000000 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedUrlListJobParameters.java +++ /dev/null @@ -1,53 +0,0 @@ -/*- - * #%L - * HAPI FHIR JPA Server - Batch2 Task Processor - * %% - * Copyright (C) 2014 - 2024 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package ca.uhn.fhir.batch2.jobs.parameters; - -import com.fasterxml.jackson.annotation.JsonProperty; -import jakarta.annotation.Nonnull; -import jakarta.annotation.Nullable; -import org.apache.commons.lang3.Validate; - -import java.util.ArrayList; -import java.util.List; - -public class PartitionedUrlListJobParameters extends PartitionedJobParameters { - @JsonProperty("partitionedUrl") - @Nullable - private List myPartitionedUrls; - - public List getPartitionedUrls() { - if (myPartitionedUrls == null) { - myPartitionedUrls = new ArrayList<>(); - } - return myPartitionedUrls; - } - - public PartitionedUrlListJobParameters addPartitionedUrl(@Nonnull PartitionedUrl thePartitionedUrl) { - Validate.notNull(thePartitionedUrl); - getPartitionedUrls().add(thePartitionedUrl); - return this; - } - - public PartitionedUrlListJobParameters addUrl(@Nonnull String theUrl) { - PartitionedUrl partitionedUrl = new PartitionedUrl(); - partitionedUrl.setUrl(theUrl); - return addPartitionedUrl(partitionedUrl); - } -} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/ChunkProducer.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/ChunkProducer.java new file mode 100644 index 00000000000..98f5823bbdd --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/ChunkProducer.java @@ -0,0 +1,50 @@ +/*- + * #%L + * HAPI FHIR JPA Server - Batch2 Task Processor + * %% + * Copyright (C) 2014 - 2024 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package ca.uhn.fhir.batch2.jobs.step; + +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.api.pid.IResourcePidStream; +import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; +import ca.uhn.fhir.util.Logs; +import org.slf4j.Logger; + +public class ChunkProducer implements IIdChunkProducer { + private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); + private final IBatch2DaoSvc myBatch2DaoSvc; + + public ChunkProducer(IBatch2DaoSvc theBatch2DaoSvc) { + myBatch2DaoSvc = theBatch2DaoSvc; + } + + @Override + public IResourcePidStream fetchResourceIdStream(ChunkRangeJson theData) { + String theUrl = theData.getUrl(); + RequestPartitionId targetPartitionId = theData.getPartitionId(); + ourLog.info( + "Fetching resource ID chunk in partition {} for URL {} - Range {} - {}", + targetPartitionId, + theUrl, + theData.getStart(), + theData.getEnd()); + + return myBatch2DaoSvc.fetchResourceIdStream(theData.getStart(), theData.getEnd(), targetPartitionId, theUrl); + } +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStep.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStep.java index 17b91325bdf..5448fa959b5 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStep.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStep.java @@ -25,49 +25,103 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException; import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.api.VoidModel; -import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson; +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; +import ca.uhn.fhir.batch2.jobs.parameters.JobParameters; import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; -import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.util.Logs; import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; import org.slf4j.Logger; +import org.thymeleaf.util.StringUtils; import java.util.Date; +import java.util.List; import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE; -public class GenerateRangeChunksStep - implements IFirstJobStepWorker { +public class GenerateRangeChunksStep implements IFirstJobStepWorker { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); @Nonnull @Override public RunOutcome run( @Nonnull StepExecutionDetails theStepExecutionDetails, - @Nonnull IJobDataSink theDataSink) + @Nonnull IJobDataSink theDataSink) throws JobExecutionFailedException { PT params = theStepExecutionDetails.getParameters(); Date start = BATCH_START_DATE; Date end = new Date(); - if (params.getPartitionedUrls().isEmpty()) { - ourLog.info("Searching for All Resources from {} to {}", start, end); - PartitionedUrlChunkRangeJson nextRange = new PartitionedUrlChunkRangeJson(); - nextRange.setStart(start); - nextRange.setEnd(end); - theDataSink.accept(nextRange); - } else { - for (PartitionedUrl nextPartitionedUrl : params.getPartitionedUrls()) { - ourLog.info("Searching for [{}]] from {} to {}", nextPartitionedUrl, start, end); - PartitionedUrlChunkRangeJson nextRange = new PartitionedUrlChunkRangeJson(); - nextRange.setPartitionedUrl(nextPartitionedUrl); - nextRange.setStart(start); - nextRange.setEnd(end); - theDataSink.accept(nextRange); + // there are partitions configured in either of the following lists, which are both optional + // the following code considers all use-cases + // the logic can be simplified once PartitionedUrl.myRequestPartitionId is deprecated + // @see IJobPartitionProvider + + List partitionIds = params.getRequestPartitionIds(); + List partitionedUrls = params.getPartitionedUrls(); + + if (partitionIds.isEmpty()) { + if (partitionedUrls.isEmpty()) { + ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end); + sendChunk(chunkRangeJson, theDataSink); + return RunOutcome.SUCCESS; } + partitionedUrls.forEach(partitionedUrl -> { + String url = partitionedUrl.getUrl(); + RequestPartitionId partitionId = partitionedUrl.getRequestPartitionId(); + ChunkRangeJson chunkRangeJson = + new ChunkRangeJson(start, end).setUrl(url).setPartitionId(partitionId); + sendChunk(chunkRangeJson, theDataSink); + }); + return RunOutcome.SUCCESS; } + partitionIds.forEach(partitionId -> { + if (partitionedUrls.isEmpty()) { + ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end).setPartitionId(partitionId); + sendChunk(chunkRangeJson, theDataSink); + return; + } + partitionedUrls.forEach(partitionedUrl -> { + String url = partitionedUrl.getUrl(); + RequestPartitionId urlPartitionId = partitionedUrl.getRequestPartitionId(); + RequestPartitionId narrowPartitionId = determineNarrowPartitionId(partitionId, urlPartitionId); + ChunkRangeJson chunkRangeJson = + new ChunkRangeJson(start, end).setUrl(url).setPartitionId(narrowPartitionId); + sendChunk(chunkRangeJson, theDataSink); + }); + }); + return RunOutcome.SUCCESS; } + + private RequestPartitionId determineNarrowPartitionId( + @Nonnull RequestPartitionId theRequestPartitionId, + @Nullable RequestPartitionId theOtherRequestPartitionId) { + if (theOtherRequestPartitionId == null) { + return theRequestPartitionId; + } + if (theRequestPartitionId.isAllPartitions() && !theOtherRequestPartitionId.isAllPartitions()) { + return theOtherRequestPartitionId; + } + if (theRequestPartitionId.isDefaultPartition() + && !theOtherRequestPartitionId.isDefaultPartition() + && !theOtherRequestPartitionId.isAllPartitions()) { + return theOtherRequestPartitionId; + } + return theRequestPartitionId; + } + + private void sendChunk(ChunkRangeJson theData, IJobDataSink theDataSink) { + String url = theData.getUrl(); + ourLog.info( + "Creating chunks for [{}] from {} to {} for partition {}", + !StringUtils.isEmpty(url) ? url : "everything", + theData.getStart(), + theData.getEnd(), + theData.getPartitionId()); + theDataSink.accept(theData); + } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/IIdChunkProducer.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/IIdChunkProducer.java index 0a2ba575eaf..5528c763f56 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/IIdChunkProducer.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/IIdChunkProducer.java @@ -27,13 +27,18 @@ import jakarta.annotation.Nullable; import java.util.Date; /** - * A service that produces pages of resource pids based on the data provided by a previous batch step. Typically the + * A service that produces pages of resource pids based on the data provided by a previous batch step. Typically, the * first step in a batch job produces work chunks that define what types of data the batch operation will be performing * (e.g. a list of resource types and date ranges). This service is then used by the second step to actually query and * page through resource pids based on the chunk definitions produced by the first step. - * @param This parameter defines constraints on the types of pids we are pulling (e.g. resource type, url, etc). + * @param This parameter defines constraints on the types of pids we are pulling (e.g. resource type, url, etc.). */ public interface IIdChunkProducer { - IResourcePidStream fetchResourceIdStream( - Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, IT theData); + @Deprecated(since = "7.3.7") + default IResourcePidStream fetchResourceIdStream( + Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, IT theData) { + return fetchResourceIdStream(theData); + } + + IResourcePidStream fetchResourceIdStream(IT theData); } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/LoadIdsStep.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/LoadIdsStep.java index 9f6312b31c4..7e01f6124d4 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/LoadIdsStep.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/LoadIdsStep.java @@ -24,26 +24,25 @@ import ca.uhn.fhir.batch2.api.IJobStepWorker; import ca.uhn.fhir.batch2.api.JobExecutionFailedException; import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.api.StepExecutionDetails; -import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson; +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; -import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters; +import ca.uhn.fhir.batch2.jobs.parameters.JobParameters; import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; import jakarta.annotation.Nonnull; -public class LoadIdsStep - implements IJobStepWorker { - private final ResourceIdListStep myResourceIdListStep; +public class LoadIdsStep + implements IJobStepWorker { + private final ResourceIdListStep myResourceIdListStep; public LoadIdsStep(IBatch2DaoSvc theBatch2DaoSvc) { - IIdChunkProducer idChunkProducer = - new PartitionedUrlListIdChunkProducer(theBatch2DaoSvc); + IIdChunkProducer idChunkProducer = new ChunkProducer(theBatch2DaoSvc); myResourceIdListStep = new ResourceIdListStep<>(idChunkProducer); } @Nonnull @Override public RunOutcome run( - @Nonnull StepExecutionDetails theStepExecutionDetails, + @Nonnull StepExecutionDetails theStepExecutionDetails, @Nonnull IJobDataSink theDataSink) throws JobExecutionFailedException { return myResourceIdListStep.run(theStepExecutionDetails, theDataSink); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/PartitionedUrlListIdChunkProducer.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/PartitionedUrlListIdChunkProducer.java deleted file mode 100644 index cf867168c01..00000000000 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/PartitionedUrlListIdChunkProducer.java +++ /dev/null @@ -1,67 +0,0 @@ -/*- - * #%L - * HAPI FHIR JPA Server - Batch2 Task Processor - * %% - * Copyright (C) 2014 - 2024 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package ca.uhn.fhir.batch2.jobs.step; - -import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson; -import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; -import ca.uhn.fhir.interceptor.model.RequestPartitionId; -import ca.uhn.fhir.jpa.api.pid.IResourcePidStream; -import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; -import ca.uhn.fhir.util.Logs; -import jakarta.annotation.Nullable; -import org.slf4j.Logger; - -import java.util.Date; - -import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; - -public class PartitionedUrlListIdChunkProducer implements IIdChunkProducer { - private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); - private final IBatch2DaoSvc myBatch2DaoSvc; - - public PartitionedUrlListIdChunkProducer(IBatch2DaoSvc theBatch2DaoSvc) { - myBatch2DaoSvc = theBatch2DaoSvc; - } - - @Override - public IResourcePidStream fetchResourceIdStream( - Date theStart, - Date theEnd, - @Nullable RequestPartitionId theRequestPartitionId, - PartitionedUrlChunkRangeJson theData) { - PartitionedUrl partitionedUrl = theData.getPartitionedUrl(); - - RequestPartitionId targetPartitionId; - String theUrl; - - if (partitionedUrl == null) { - theUrl = null; - targetPartitionId = theRequestPartitionId; - ourLog.info("Fetching resource ID chunk for everything - Range {} - {}", theStart, theEnd); - } else { - theUrl = partitionedUrl.getUrl(); - targetPartitionId = defaultIfNull(partitionedUrl.getRequestPartitionId(), theRequestPartitionId); - ourLog.info( - "Fetching resource ID chunk for URL {} - Range {} - {}", partitionedUrl.getUrl(), theStart, theEnd); - } - - return myBatch2DaoSvc.fetchResourceIdStream(theStart, theEnd, targetPartitionId, theUrl); - } -} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStep.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStep.java index a9929444d36..10349aa4e47 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStep.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStep.java @@ -27,7 +27,7 @@ import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson; -import ca.uhn.fhir.batch2.jobs.parameters.PartitionedJobParameters; +import ca.uhn.fhir.batch2.jobs.parameters.JobParameters; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.pid.IResourcePidStream; import ca.uhn.fhir.util.Logs; @@ -42,39 +42,35 @@ import java.util.stream.Stream; import static ca.uhn.fhir.util.StreamUtil.partition; import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; -public class ResourceIdListStep - implements IJobStepWorker { +public class ResourceIdListStep + implements IJobStepWorker { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); protected static final int MAX_BATCH_OF_IDS = 500; - private final IIdChunkProducer myIdChunkProducer; + private final IIdChunkProducer myIdChunkProducer; - public ResourceIdListStep(IIdChunkProducer theIdChunkProducer) { + public ResourceIdListStep(IIdChunkProducer theIdChunkProducer) { myIdChunkProducer = theIdChunkProducer; } @Nonnull @Override public RunOutcome run( - @Nonnull StepExecutionDetails theStepExecutionDetails, + @Nonnull StepExecutionDetails theStepExecutionDetails, @Nonnull IJobDataSink theDataSink) throws JobExecutionFailedException { - IT data = theStepExecutionDetails.getData(); + ChunkRangeJson data = theStepExecutionDetails.getData(); Date start = data.getStart(); Date end = data.getEnd(); Integer batchSize = theStepExecutionDetails.getParameters().getBatchSize(); - ourLog.info("Beginning scan for reindex IDs in range {} to {}", start, end); - - RequestPartitionId requestPartitionId = - theStepExecutionDetails.getParameters().getRequestPartitionId(); + ourLog.info("Beginning to submit chunks in range {} to {}", start, end); int chunkSize = Math.min(defaultIfNull(batchSize, MAX_BATCH_OF_IDS), MAX_BATCH_OF_IDS); - - final IResourcePidStream searchResult = myIdChunkProducer.fetchResourceIdStream( - start, end, requestPartitionId, theStepExecutionDetails.getData()); + final IResourcePidStream searchResult = + myIdChunkProducer.fetchResourceIdStream(theStepExecutionDetails.getData()); searchResult.visitStreamNoResult(typedResourcePidStream -> { AtomicInteger totalIdsFound = new AtomicInteger(); @@ -101,7 +97,7 @@ public class ResourceIdListStep partitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation); + + // verify + assertThat(partitionIds).hasSize(1); + assertThat(partitionIds).containsExactlyInAnyOrder(partitionId); + } +} \ No newline at end of file diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/ChunkProducerTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/ChunkProducerTest.java new file mode 100644 index 00000000000..fababf329fc --- /dev/null +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/ChunkProducerTest.java @@ -0,0 +1,53 @@ +package ca.uhn.fhir.batch2.jobs.step; + +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.api.pid.IResourcePidStream; +import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Date; + +import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class ChunkProducerTest { + @Mock + private IBatch2DaoSvc myBatch2DaoSvc; + @InjectMocks + private ChunkProducer myChunkProducer; + + @Test + public void fetchResourceIdStream_worksAsExpected() { + // setup + RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1); + String url = "Patient?"; + ChunkRangeJson chunkRangeJson = new ChunkRangeJson(BATCH_START_DATE, new Date()).setPartitionId(partitionId).setUrl(url); + IResourcePidStream stream = mock(IResourcePidStream.class); + when(myBatch2DaoSvc.fetchResourceIdStream(any(), any(), any(), any())).thenReturn(stream); + + // test + IResourcePidStream actualStream = myChunkProducer.fetchResourceIdStream(chunkRangeJson); + + // verify + assertThat(actualStream).isSameAs(stream); + ArgumentCaptor dateCaptor = ArgumentCaptor.forClass(Date.class); + ArgumentCaptor partitionCaptor = ArgumentCaptor.forClass(RequestPartitionId.class); + ArgumentCaptor urlCaptor = ArgumentCaptor.forClass(String.class); + verify(myBatch2DaoSvc).fetchResourceIdStream(dateCaptor.capture(), dateCaptor.capture(), partitionCaptor.capture(), urlCaptor.capture()); + assertThat(dateCaptor.getAllValues()).containsExactly(chunkRangeJson.getStart(), chunkRangeJson.getEnd()); + assertThat(partitionCaptor.getValue()).isEqualTo(partitionId); + assertThat(urlCaptor.getValue()).isEqualTo(url); + + } +} \ No newline at end of file diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStepTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStepTest.java new file mode 100644 index 00000000000..05f0dc76466 --- /dev/null +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStepTest.java @@ -0,0 +1,140 @@ +package ca.uhn.fhir.batch2.jobs.step; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; +import ca.uhn.fhir.batch2.jobs.parameters.JobParameters; +import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.stream.Stream; + +import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class GenerateRangeChunksStepTest { + + private final GenerateRangeChunksStep myStep = new GenerateRangeChunksStep<>(); + @Mock + private StepExecutionDetails myStepExecutionDetails; + @Mock + private IJobDataSink myJobDataSink; + private static final Date START = BATCH_START_DATE; + private static final Date END = new Date(); + + @BeforeEach + void setUp() { + } + + @AfterEach + void tearDown() { + } + + public static Stream getReindexParameters() { + List threePartitions = List.of( + RequestPartitionId.fromPartitionId(1), + RequestPartitionId.fromPartitionId(2), + RequestPartitionId.fromPartitionId(3) + ); + List partition1 = List.of(RequestPartitionId.fromPartitionId(1)); + + // the actual values (URLs, partitionId) don't matter, but we add values similar to real hapi-fhir use-cases + return Stream.of( + Arguments.of(List.of(), List.of(), false, 1), + Arguments.of(List.of(), partition1, false, 1), + Arguments.of(List.of("Observation?"), threePartitions, false, 3), + Arguments.of(List.of("Observation?"), List.of(), false, 1), + Arguments.of(List.of("Observation?"), partition1, true, 1), + Arguments.of(List.of("Observation?", "Patient?"), threePartitions, false, 6), + Arguments.of(List.of("Observation?", "Patient?", "Practitioner?"), threePartitions, true, 3), + Arguments.of(List.of("Observation?status=final", "Patient?"), partition1, false, 2), + Arguments.of(List.of("Observation?status=final"), threePartitions, false, 2) + ); + } + + @ParameterizedTest + @MethodSource(value = "getReindexParameters") + public void run_withParameters_producesExpectedChunks(List theUrls, List thePartitions, + boolean theShouldAssignPartitionToUrl, int theExpectedChunkCount) { + JobParameters parameters = JobParameters.from(theUrls, thePartitions, theShouldAssignPartitionToUrl); + + when(myStepExecutionDetails.getParameters()).thenReturn(parameters); + myStep.run(myStepExecutionDetails, myJobDataSink); + + ArgumentCaptor captor = ArgumentCaptor.forClass(ChunkRangeJson.class); + verify(myJobDataSink, times(theExpectedChunkCount)).accept(captor.capture()); + + List chunkRangeJsonList = getExpectedChunkList(theUrls, thePartitions, theShouldAssignPartitionToUrl, theExpectedChunkCount); + + RequestPartitionId[] actualPartitionIds = captor.getAllValues().stream().map(ChunkRangeJson::getPartitionId).toList().toArray(new RequestPartitionId[0]); + RequestPartitionId[] expectedPartitionIds = chunkRangeJsonList.stream().map(ChunkRangeJson::getPartitionId).toList().toArray(new RequestPartitionId[0]); + assertThat(actualPartitionIds).containsExactlyInAnyOrder(expectedPartitionIds); + + String[] actualUrls = captor.getAllValues().stream().map(ChunkRangeJson::getUrl).toList().toArray(new String[0]); + String[] expectedUrls = chunkRangeJsonList.stream().map(ChunkRangeJson::getUrl).toList().toArray(new String[0]); + assertThat(actualUrls).containsExactlyInAnyOrder(expectedUrls); + } + + private List getExpectedChunkList(List theUrls, List thePartitions, + boolean theShouldAssignPartitionToUrl, int theExpectedChunkCount) { + List chunkRangeJsonList = new ArrayList<>(); + if (theShouldAssignPartitionToUrl) { + for (int i = 0; i < theExpectedChunkCount; i++) { + String url = theUrls.get(i); + RequestPartitionId partition = thePartitions.get(i); + ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setUrl(url).setPartitionId(partition); + chunkRangeJsonList.add(chunkRangeJson); + } + return chunkRangeJsonList; + } + + if (theUrls.isEmpty() && thePartitions.isEmpty()) { + ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END); + chunkRangeJsonList.add(chunkRangeJson); + return chunkRangeJsonList; + } + + + if (theUrls.isEmpty()) { + for (RequestPartitionId partition : thePartitions) { + ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setPartitionId(partition); + chunkRangeJsonList.add(chunkRangeJson); + } + return chunkRangeJsonList; + } + + if (thePartitions.isEmpty()) { + for (String url : theUrls) { + ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setUrl(url); + chunkRangeJsonList.add(chunkRangeJson); + } + return chunkRangeJsonList; + } + + theUrls.forEach(url -> { + for (RequestPartitionId partition : thePartitions) { + ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setUrl(url).setPartitionId(partition); + chunkRangeJsonList.add(chunkRangeJson); + } + }); + + return chunkRangeJsonList; + } +} \ No newline at end of file diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/LoadIdsStepTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/LoadIdsStepTest.java index 878728274fd..1dec4ec41c4 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/LoadIdsStepTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/LoadIdsStepTest.java @@ -2,11 +2,12 @@ package ca.uhn.fhir.batch2.jobs.step; import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.StepExecutionDetails; -import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson; +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; -import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters; +import ca.uhn.fhir.batch2.jobs.parameters.JobParameters; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.WorkChunk; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList; import ca.uhn.fhir.jpa.api.pid.IResourcePidList; import ca.uhn.fhir.jpa.api.pid.IResourcePidStream; @@ -48,11 +49,11 @@ public class LoadIdsStepTest { @Mock private IJobDataSink mySink; - private LoadIdsStep mySvc; + private LoadIdsStep mySvc; @BeforeEach public void before() { - mySvc = new LoadIdsStep(myBatch2DaoSvc); + mySvc = new LoadIdsStep<>(myBatch2DaoSvc); } @Captor @@ -60,18 +61,17 @@ public class LoadIdsStepTest { @Test public void testGenerateSteps() { - PartitionedUrlListJobParameters parameters = new PartitionedUrlListJobParameters(); - PartitionedUrlChunkRangeJson range = new PartitionedUrlChunkRangeJson(); - range.setStart(DATE_1).setEnd(DATE_END); + JobParameters parameters = new JobParameters(); + ChunkRangeJson range = new ChunkRangeJson(DATE_1, DATE_END).setPartitionId(RequestPartitionId.allPartitions()); String instanceId = "instance-id"; JobInstance jobInstance = JobInstance.fromInstanceId(instanceId); String chunkId = "chunk-id"; - StepExecutionDetails details = new StepExecutionDetails<>(parameters, range, jobInstance, new WorkChunk().setId(chunkId)); + StepExecutionDetails details = new StepExecutionDetails<>(parameters, range, jobInstance, new WorkChunk().setId(chunkId)); // First Execution when(myBatch2DaoSvc.fetchResourceIdStream(eq(DATE_1), eq(DATE_END), isNull(), isNull())) - .thenReturn(createIdChunk(0L, 20000L, DATE_2)); + .thenReturn(createIdChunk()); mySvc.run(details, mySink); @@ -98,13 +98,12 @@ public class LoadIdsStepTest { } @Nonnull - private IResourcePidStream createIdChunk(long idLow, long idHigh, Date lastDate) { + private IResourcePidStream createIdChunk() { List ids = new ArrayList<>(); - List resourceTypes = new ArrayList<>(); - for (long i = idLow; i < idHigh; i++) { + for (long i = 0; i < 20000; i++) { ids.add(JpaPid.fromId(i)); } - IResourcePidList chunk = new HomogeneousResourcePidList("Patient", ids, lastDate, null); + IResourcePidList chunk = new HomogeneousResourcePidList("Patient", ids, DATE_2, null); return new ListWrappingPidStream(chunk); } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStepTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStepTest.java index af064f8cfd2..df360743444 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStepTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/ResourceIdListStepTest.java @@ -3,9 +3,10 @@ package ca.uhn.fhir.batch2.jobs.step; import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.api.StepExecutionDetails; -import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson; +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; -import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters; +import ca.uhn.fhir.batch2.jobs.parameters.JobParameters; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList; import ca.uhn.fhir.jpa.api.pid.IResourcePidStream; import ca.uhn.fhir.jpa.api.pid.ListWrappingPidStream; @@ -35,20 +36,20 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class ResourceIdListStepTest { @Mock - private IIdChunkProducer myIdChunkProducer; + private IIdChunkProducer myIdChunkProducer; @Mock - private StepExecutionDetails myStepExecutionDetails; + private StepExecutionDetails myStepExecutionDetails; @Mock private IJobDataSink myDataSink; @Mock - private PartitionedUrlChunkRangeJson myData; + private ChunkRangeJson myData; @Mock - private PartitionedUrlListJobParameters myParameters; + private JobParameters myParameters; @Captor private ArgumentCaptor myDataCaptor; - private ResourceIdListStep myResourceIdListStep; + private ResourceIdListStep myResourceIdListStep; @BeforeEach void beforeEach() { @@ -59,11 +60,13 @@ class ResourceIdListStepTest { @ValueSource(ints = {0, 1, 100, 500, 501, 2345, 10500}) void testResourceIdListBatchSizeLimit(int theListSize) { List idList = generateIdList(theListSize); + RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1); + when(myStepExecutionDetails.getData()).thenReturn(myData); when(myParameters.getBatchSize()).thenReturn(500); when(myStepExecutionDetails.getParameters()).thenReturn(myParameters); - IResourcePidStream mockStream = new ListWrappingPidStream( - new HomogeneousResourcePidList("Patient", idList, null, null)); + IResourcePidStream resourcePidStream = new ListWrappingPidStream( + new HomogeneousResourcePidList("Patient", idList, null, partitionId)); if (theListSize > 0) { // Ensure none of the work chunks exceed MAX_BATCH_OF_IDS in size: doAnswer(i -> { @@ -73,8 +76,7 @@ class ResourceIdListStepTest { return null; }).when(myDataSink).accept(any(ResourceIdListWorkChunkJson.class)); } - when(myIdChunkProducer.fetchResourceIdStream(any(), any(), any(), any())) - .thenReturn(mockStream); + when(myIdChunkProducer.fetchResourceIdStream(any())).thenReturn(resourcePidStream); final RunOutcome run = myResourceIdListStep.run(myStepExecutionDetails, myDataSink); assertThat(run).isNotEqualTo(null); @@ -86,8 +88,9 @@ class ResourceIdListStepTest { assertThat(allDataChunks).hasSize(expectedBatchCount); // Ensure that all chunks except the very last one are MAX_BATCH_OF_IDS in length - for (int i = 0; i < expectedBatchCount - 1; i++) { - assertEquals(ResourceIdListStep.MAX_BATCH_OF_IDS, allDataChunks.get(i).size()); + for (ResourceIdListWorkChunkJson dataChunk : allDataChunks) { + assertEquals(ResourceIdListStep.MAX_BATCH_OF_IDS, dataChunk.size()); + assertEquals(partitionId, dataChunk.getRequestPartitionId()); } // The very last chunk should be whatever is left over (if there is a remainder): diff --git a/hapi-fhir-storage-mdm/pom.xml b/hapi-fhir-storage-mdm/pom.xml index 56ae8033745..28dcc344718 100644 --- a/hapi-fhir-storage-mdm/pom.xml +++ b/hapi-fhir-storage-mdm/pom.xml @@ -30,7 +30,7 @@ ca.uhn.hapi.fhir - hapi-fhir-storage-batch2 + hapi-fhir-storage-batch2-jobs ${project.version} diff --git a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/LoadGoldenIdsStep.java b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/LoadGoldenIdsStep.java index 4a1bcfa9c85..c3a36ce1fd5 100644 --- a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/LoadGoldenIdsStep.java +++ b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/LoadGoldenIdsStep.java @@ -24,6 +24,7 @@ import ca.uhn.fhir.batch2.api.IJobStepWorker; import ca.uhn.fhir.batch2.api.JobExecutionFailedException; import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.step.IIdChunkProducer; import ca.uhn.fhir.batch2.jobs.step.ResourceIdListStep; @@ -32,11 +33,11 @@ import ca.uhn.fhir.mdm.batch2.clear.MdmClearJobParameters; import jakarta.annotation.Nonnull; public class LoadGoldenIdsStep - implements IJobStepWorker { - private final ResourceIdListStep myResourceIdListStep; + implements IJobStepWorker { + private final ResourceIdListStep myResourceIdListStep; public LoadGoldenIdsStep(IGoldenResourceSearchSvc theGoldenResourceSearchSvc) { - IIdChunkProducer idChunkProducer = new MdmIdChunkProducer(theGoldenResourceSearchSvc); + IIdChunkProducer idChunkProducer = new MdmIdChunkProducer(theGoldenResourceSearchSvc); myResourceIdListStep = new ResourceIdListStep<>(idChunkProducer); } @@ -44,7 +45,7 @@ public class LoadGoldenIdsStep @Nonnull @Override public RunOutcome run( - @Nonnull StepExecutionDetails theStepExecutionDetails, + @Nonnull StepExecutionDetails theStepExecutionDetails, @Nonnull IJobDataSink theDataSink) throws JobExecutionFailedException { return myResourceIdListStep.run(theStepExecutionDetails, theDataSink); diff --git a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/MdmBatch2Config.java b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/MdmBatch2Config.java index 6d000bf9561..4a2e6c08932 100644 --- a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/MdmBatch2Config.java +++ b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/MdmBatch2Config.java @@ -20,6 +20,7 @@ package ca.uhn.fhir.mdm.batch2; import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; +import ca.uhn.fhir.batch2.jobs.config.BatchCommonCtx; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.mdm.batch2.clear.MdmClearAppCtx; import ca.uhn.fhir.mdm.batch2.clear.MdmClearJobParameters; @@ -34,7 +35,7 @@ import static ca.uhn.fhir.mdm.batch2.clear.MdmClearAppCtx.MDM_CLEAR_JOB_BEAN_NAM import static ca.uhn.fhir.mdm.batch2.submit.MdmSubmitAppCtx.MDM_SUBMIT_JOB_BEAN_NAME; @Configuration -@Import({MdmClearAppCtx.class, MdmSubmitAppCtx.class}) +@Import({MdmClearAppCtx.class, MdmSubmitAppCtx.class, BatchCommonCtx.class}) public class MdmBatch2Config { @Bean MdmJobDefinitionLoader mdmJobDefinitionLoader( diff --git a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/MdmChunkRangeJson.java b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/MdmChunkRangeJson.java deleted file mode 100644 index 4d778a98efa..00000000000 --- a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/MdmChunkRangeJson.java +++ /dev/null @@ -1,40 +0,0 @@ -/*- - * #%L - * hapi-fhir-storage-mdm - * %% - * Copyright (C) 2014 - 2024 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package ca.uhn.fhir.mdm.batch2; - -import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; -import com.fasterxml.jackson.annotation.JsonProperty; -import jakarta.annotation.Nonnull; -import jakarta.annotation.Nullable; - -public class MdmChunkRangeJson extends ChunkRangeJson { - @Nonnull - @JsonProperty("resourceType") - private String myResourceType; - - @Nonnull - public String getResourceType() { - return myResourceType; - } - - public void setResourceType(@Nullable String theResourceType) { - myResourceType = theResourceType; - } -} diff --git a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/MdmGenerateRangeChunksStep.java b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/MdmGenerateRangeChunksStep.java index 8a1bbe4d382..1ccc643c17f 100644 --- a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/MdmGenerateRangeChunksStep.java +++ b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/MdmGenerateRangeChunksStep.java @@ -25,6 +25,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException; import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; import ca.uhn.fhir.batch2.util.Batch2Utils; import ca.uhn.fhir.mdm.batch2.clear.MdmClearJobParameters; import jakarta.annotation.Nonnull; @@ -33,14 +34,14 @@ import org.slf4j.LoggerFactory; import java.util.Date; -public class MdmGenerateRangeChunksStep implements IFirstJobStepWorker { +public class MdmGenerateRangeChunksStep implements IFirstJobStepWorker { private static final Logger ourLog = LoggerFactory.getLogger(MdmGenerateRangeChunksStep.class); @Nonnull @Override public RunOutcome run( @Nonnull StepExecutionDetails theStepExecutionDetails, - @Nonnull IJobDataSink theDataSink) + @Nonnull IJobDataSink theDataSink) throws JobExecutionFailedException { MdmClearJobParameters params = theStepExecutionDetails.getParameters(); @@ -49,10 +50,10 @@ public class MdmGenerateRangeChunksStep implements IFirstJobStepWorker { +public class MdmIdChunkProducer implements IIdChunkProducer { private static final Logger ourLog = LoggerFactory.getLogger(MdmIdChunkProducer.class); private final IGoldenResourceSearchSvc myGoldenResourceSearchSvc; @@ -38,17 +35,16 @@ public class MdmIdChunkProducer implements IIdChunkProducer { } @Override - public IResourcePidStream fetchResourceIdStream( - Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, MdmChunkRangeJson theData) { + public IResourcePidStream fetchResourceIdStream(ChunkRangeJson theData) { String resourceType = theData.getResourceType(); ourLog.info( "Fetching golden resource ID chunk for resource type {} - Range {} - {}", resourceType, - theStart, - theEnd); + theData.getStart(), + theData.getEnd()); return myGoldenResourceSearchSvc.fetchGoldenResourceIdStream( - theStart, theEnd, theRequestPartitionId, resourceType); + theData.getStart(), theData.getEnd(), theData.getPartitionId(), resourceType); } } diff --git a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/clear/MdmClearAppCtx.java b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/clear/MdmClearAppCtx.java index 4a6e6b928f6..d2304f1a1f9 100644 --- a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/clear/MdmClearAppCtx.java +++ b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/clear/MdmClearAppCtx.java @@ -19,13 +19,13 @@ */ package ca.uhn.fhir.mdm.batch2.clear; +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.svc.IGoldenResourceSearchSvc; import ca.uhn.fhir.mdm.api.IMdmSettings; import ca.uhn.fhir.mdm.batch2.LoadGoldenIdsStep; -import ca.uhn.fhir.mdm.batch2.MdmChunkRangeJson; import ca.uhn.fhir.mdm.batch2.MdmGenerateRangeChunksStep; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -50,7 +50,7 @@ public class MdmClearAppCtx { .addFirstStep( "generate-ranges", "Generate date ranges to Mdm Clear", - MdmChunkRangeJson.class, + ChunkRangeJson.class, mdmGenerateRangeChunksStep()) .addIntermediateStep( "find-golden-resource-ids", diff --git a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/clear/MdmClearJobParameters.java b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/clear/MdmClearJobParameters.java index 6baeba5e6e3..8018b68679d 100644 --- a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/clear/MdmClearJobParameters.java +++ b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/clear/MdmClearJobParameters.java @@ -19,7 +19,7 @@ */ package ca.uhn.fhir.mdm.batch2.clear; -import ca.uhn.fhir.batch2.jobs.parameters.PartitionedJobParameters; +import ca.uhn.fhir.batch2.jobs.parameters.JobParameters; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.annotation.Nonnull; import jakarta.validation.constraints.Pattern; @@ -28,7 +28,7 @@ import org.apache.commons.lang3.Validate; import java.util.ArrayList; import java.util.List; -public class MdmClearJobParameters extends PartitionedJobParameters { +public class MdmClearJobParameters extends JobParameters { @JsonProperty("resourceType") @Nonnull private List<@Pattern(regexp = "^[A-Z][A-Za-z]+$", message = "If populated, must be a valid resource type'") String> diff --git a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/clear/MdmClearStep.java b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/clear/MdmClearStep.java index ba97fb6dca8..af008c37e1f 100644 --- a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/clear/MdmClearStep.java +++ b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/clear/MdmClearStep.java @@ -75,8 +75,7 @@ public class MdmClearStep implements IJobStepWorker submitGenerateRangeChunksStep() { - return new GenerateRangeChunksStep<>(); - } - @Bean(name = MDM_SUBMIT_JOB_BEAN_NAME) public JobDefinition mdmSubmitJobDefinition( IBatch2DaoSvc theBatch2DaoSvc, @@ -60,7 +55,7 @@ public class MdmSubmitAppCtx { .addFirstStep( "generate-ranges", "generate data ranges to submit to mdm", - PartitionedUrlChunkRangeJson.class, + ChunkRangeJson.class, submitGenerateRangeChunksStep()) .addIntermediateStep( "load-ids", "Load the IDs", ResourceIdListWorkChunkJson.class, loadIdsStep(theBatch2DaoSvc)) @@ -77,6 +72,11 @@ public class MdmSubmitAppCtx { return new MdmSubmitJobParametersValidator(theMdmSettings, theMatchUrlService, theFhirContext); } + @Bean + public GenerateRangeChunksStep submitGenerateRangeChunksStep() { + return new GenerateRangeChunksStep<>(); + } + @Bean public LoadIdsStep loadIdsStep(IBatch2DaoSvc theBatch2DaoSvc) { return new LoadIdsStep<>(theBatch2DaoSvc); diff --git a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/submit/MdmSubmitJobParameters.java b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/submit/MdmSubmitJobParameters.java index 663478fb44a..8dd0ec6bb2c 100644 --- a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/submit/MdmSubmitJobParameters.java +++ b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/submit/MdmSubmitJobParameters.java @@ -19,6 +19,6 @@ */ package ca.uhn.fhir.mdm.batch2.submit; -import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters; +import ca.uhn.fhir.batch2.jobs.parameters.JobParameters; -public class MdmSubmitJobParameters extends PartitionedUrlListJobParameters {} +public class MdmSubmitJobParameters extends JobParameters {} diff --git a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/submit/MdmSubmitJobParametersValidator.java b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/submit/MdmSubmitJobParametersValidator.java index f75edc5fab5..3a8bf0bf961 100644 --- a/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/submit/MdmSubmitJobParametersValidator.java +++ b/hapi-fhir-storage-mdm/src/main/java/ca/uhn/fhir/mdm/batch2/submit/MdmSubmitJobParametersValidator.java @@ -54,25 +54,25 @@ public class MdmSubmitJobParametersValidator implements IJobParametersValidator< String resourceType = getResourceTypeFromUrl(url); RuntimeResourceDefinition resourceDefinition = myFhirContext.getResourceDefinition(resourceType); validateTypeIsUsedByMdm(errorMsgs, resourceType); - validateAllSearchParametersApplyToResourceType(errorMsgs, partitionedUrl, resourceType, resourceDefinition); + validateAllSearchParametersApplyToResourceType(errorMsgs, url, resourceType, resourceDefinition); } return errorMsgs; } private void validateAllSearchParametersApplyToResourceType( - List errorMsgs, - PartitionedUrl partitionedUrl, - String resourceType, + List theErrorMessages, + String theUrl, + String theResourceType, RuntimeResourceDefinition resourceDefinition) { try { - myMatchUrlService.translateMatchUrl(partitionedUrl.getUrl(), resourceDefinition); + myMatchUrlService.translateMatchUrl(theUrl, resourceDefinition); } catch (MatchUrlService.UnrecognizedSearchParameterException e) { String errorMsg = String.format( "Search parameter %s is not recognized for resource type %s. Source error is %s", - e.getParamName(), resourceType, e.getMessage()); - errorMsgs.add(errorMsg); + e.getParamName(), theResourceType, e.getMessage()); + theErrorMessages.add(errorMsg); } catch (InvalidRequestException e) { - errorMsgs.add("Invalid request detected: " + e.getMessage()); + theErrorMessages.add("Invalid request detected: " + e.getMessage()); } } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/partition/BaseRequestPartitionHelperSvc.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/partition/BaseRequestPartitionHelperSvc.java index be151786428..b5fb4d427f9 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/partition/BaseRequestPartitionHelperSvc.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/partition/BaseRequestPartitionHelperSvc.java @@ -38,8 +38,11 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -50,6 +53,7 @@ import static ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster.doCal import static ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster.hasHooks; public abstract class BaseRequestPartitionHelperSvc implements IRequestPartitionHelperSvc { + private static final Logger ourLog = LoggerFactory.getLogger(BaseRequestPartitionHelperSvc.class); private final HashSet myNonPartitionableResourceNames; @@ -95,81 +99,86 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition @Override public RequestPartitionId determineReadPartitionForRequest( @Nullable RequestDetails theRequest, @Nonnull ReadPartitionIdRequestDetails theDetails) { - RequestPartitionId requestPartitionId; - - String resourceType = theDetails.getResourceType(); - boolean nonPartitionableResource = !isResourcePartitionable(resourceType); - if (myPartitionSettings.isPartitioningEnabled()) { - - RequestDetails requestDetails = theRequest; - // TODO GGG eventually, theRequest will not be allowed to be null here, and we will pass through - // SystemRequestDetails instead. - if (requestDetails == null) { - requestDetails = new SystemRequestDetails(); - } - - // Handle system requests - if (requestDetails instanceof SystemRequestDetails - && systemRequestHasExplicitPartition((SystemRequestDetails) requestDetails) - && !nonPartitionableResource) { - requestPartitionId = - getSystemRequestPartitionId((SystemRequestDetails) requestDetails, nonPartitionableResource); - } else if ((requestDetails instanceof SystemRequestDetails) && nonPartitionableResource) { - return RequestPartitionId.fromPartitionId(myPartitionSettings.getDefaultPartitionId()); - } else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, requestDetails)) { - // Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY - HookParams params = new HookParams() - .add(RequestDetails.class, requestDetails) - .addIfMatchesType(ServletRequestDetails.class, requestDetails); - requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject( - myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params); - } else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_READ, myInterceptorBroadcaster, requestDetails)) { - // Interceptor call: STORAGE_PARTITION_IDENTIFY_READ - HookParams params = new HookParams() - .add(RequestDetails.class, requestDetails) - .addIfMatchesType(ServletRequestDetails.class, requestDetails) - .add(ReadPartitionIdRequestDetails.class, theDetails); - requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject( - myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_READ, params); - } else { - requestPartitionId = null; - } - - validateRequestPartitionNotNull(requestPartitionId, Pointcut.STORAGE_PARTITION_IDENTIFY_READ); - - return validateNormalizeAndNotifyHooksForRead(requestPartitionId, requestDetails, resourceType); + if (!myPartitionSettings.isPartitioningEnabled()) { + return RequestPartitionId.allPartitions(); } - return RequestPartitionId.allPartitions(); + // certain use-cases (e.g. batch2 jobs), only have resource type populated in the ReadPartitionIdRequestDetails + // TODO MM: see if we can make RequestDetails consistent + String resourceType = theDetails.getResourceType(); + + RequestDetails requestDetails = theRequest; + // TODO GGG eventually, theRequest will not be allowed to be null here, and we will pass through + // SystemRequestDetails instead. + if (requestDetails == null) { + requestDetails = new SystemRequestDetails(); + } + + boolean nonPartitionableResource = isResourceNonPartitionable(resourceType); + + RequestPartitionId requestPartitionId = null; + // Handle system requests + if (requestDetails instanceof SystemRequestDetails + && systemRequestHasExplicitPartition((SystemRequestDetails) requestDetails) + && !nonPartitionableResource) { + requestPartitionId = getSystemRequestPartitionId((SystemRequestDetails) requestDetails, false); + } else if ((requestDetails instanceof SystemRequestDetails) && nonPartitionableResource) { + requestPartitionId = RequestPartitionId.fromPartitionId(myPartitionSettings.getDefaultPartitionId()); + } else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, requestDetails)) { + // Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY + HookParams params = new HookParams() + .add(RequestDetails.class, requestDetails) + .addIfMatchesType(ServletRequestDetails.class, requestDetails); + requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject( + myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params); + } else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_READ, myInterceptorBroadcaster, requestDetails)) { + // Interceptor call: STORAGE_PARTITION_IDENTIFY_READ + HookParams params = new HookParams() + .add(RequestDetails.class, requestDetails) + .addIfMatchesType(ServletRequestDetails.class, requestDetails) + .add(ReadPartitionIdRequestDetails.class, theDetails); + requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject( + myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_READ, params); + } + + validateRequestPartitionNotNull( + requestPartitionId, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, Pointcut.STORAGE_PARTITION_IDENTIFY_READ); + validateRequestPartition(requestPartitionId, resourceType); + + ourLog.info("Read with partition: {}", requestPartitionId); + + return validateAndNormalizePartition(requestPartitionId, requestDetails, resourceType); } @Override public RequestPartitionId determineGenericPartitionForRequest(RequestDetails theRequestDetails) { - RequestPartitionId retVal = null; + RequestPartitionId requestPartitionId = null; - if (myPartitionSettings.isPartitioningEnabled()) { - if (theRequestDetails instanceof SystemRequestDetails) { - SystemRequestDetails systemRequestDetails = (SystemRequestDetails) theRequestDetails; - retVal = systemRequestDetails.getRequestPartitionId(); - } + if (!myPartitionSettings.isPartitioningEnabled()) { + return RequestPartitionId.allPartitions(); } - if (retVal == null) { - if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, theRequestDetails)) { - // Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY - HookParams params = new HookParams() - .add(RequestDetails.class, theRequestDetails) - .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); - retVal = (RequestPartitionId) doCallHooksAndReturnObject( - myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params); - - if (retVal != null) { - retVal = validateNormalizeAndNotifyHooksForRead(retVal, theRequestDetails, null); - } - } + if (theRequestDetails instanceof SystemRequestDetails + && systemRequestHasExplicitPartition((SystemRequestDetails) theRequestDetails)) { + requestPartitionId = getSystemRequestPartitionId((SystemRequestDetails) theRequestDetails); + } else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, theRequestDetails)) { + // Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY + HookParams params = new HookParams() + .add(RequestDetails.class, theRequestDetails) + .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); + requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject( + myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params); } - return retVal; + // TODO MM: at the moment it is ok for this method to return null + // check if it can be made consistent and it's implications + // validateRequestPartitionNotNull(requestPartitionId, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY); + + if (requestPartitionId != null) { + return validateAndNormalizePartition( + requestPartitionId, theRequestDetails, theRequestDetails.getResourceName()); + } + return requestPartitionId; } /** @@ -216,56 +225,61 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition @Nonnull @Override public RequestPartitionId determineCreatePartitionForRequest( - @Nullable RequestDetails theRequest, @Nonnull IBaseResource theResource, @Nonnull String theResourceType) { - RequestPartitionId requestPartitionId; + @Nullable final RequestDetails theRequest, + @Nonnull IBaseResource theResource, + @Nonnull String theResourceType) { if (!myPartitionSettings.isPartitioningEnabled()) { return RequestPartitionId.allPartitions(); } - boolean nonPartitionableResource = myNonPartitionableResourceNames.contains(theResourceType); - + RequestDetails requestDetails = theRequest; // TODO GGG eventually, theRequest will not be allowed to be null here, and we will pass through // SystemRequestDetails instead. - if ((theRequest == null || theRequest instanceof SystemRequestDetails) && nonPartitionableResource) { - return RequestPartitionId.defaultPartition(); + if (theRequest == null) { + requestDetails = new SystemRequestDetails(); } + boolean nonPartitionableResource = isResourceNonPartitionable(theResourceType); + + RequestPartitionId requestPartitionId = null; if (theRequest instanceof SystemRequestDetails && systemRequestHasExplicitPartition((SystemRequestDetails) theRequest)) { requestPartitionId = getSystemRequestPartitionId((SystemRequestDetails) theRequest, nonPartitionableResource); - } else { - if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, theRequest)) { - // Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY - HookParams params = new HookParams() - .add(RequestDetails.class, theRequest) - .addIfMatchesType(ServletRequestDetails.class, theRequest); - requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject( - myInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params); - } else { - // This is an external Request (e.g. ServletRequestDetails) so we want to figure out the partition - // via interceptor. - // Interceptor call: STORAGE_PARTITION_IDENTIFY_CREATE - HookParams params = new HookParams() - .add(IBaseResource.class, theResource) - .add(RequestDetails.class, theRequest) - .addIfMatchesType(ServletRequestDetails.class, theRequest); - requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject( - myInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, params); - } - - // If the interceptors haven't selected a partition, and its a non-partitionable resource anyhow, send - // to DEFAULT - if (nonPartitionableResource && requestPartitionId == null) { - requestPartitionId = RequestPartitionId.defaultPartition(); - } + } else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, requestDetails)) { + // Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY + HookParams params = new HookParams() + .add(RequestDetails.class, requestDetails) + .addIfMatchesType(ServletRequestDetails.class, requestDetails); + requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject( + myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params); + } else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, myInterceptorBroadcaster, requestDetails)) { + // Interceptor call: STORAGE_PARTITION_IDENTIFY_CREATE + HookParams params = new HookParams() + .add(IBaseResource.class, theResource) + .add(RequestDetails.class, requestDetails) + .addIfMatchesType(ServletRequestDetails.class, requestDetails); + requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject( + myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, params); } - String resourceName = myFhirContext.getResourceType(theResource); - validateSinglePartitionForCreate(requestPartitionId, resourceName, Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE); + // If the interceptors haven't selected a partition, and its a non-partitionable resource anyhow, send + // to DEFAULT + if (nonPartitionableResource && requestPartitionId == null) { + requestPartitionId = RequestPartitionId.defaultPartition(); + } - return validateNormalizeAndNotifyHooksForRead(requestPartitionId, theRequest, theResourceType); + validateRequestPartitionNotNull( + requestPartitionId, + Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, + Pointcut.STORAGE_PARTITION_IDENTIFY_ANY); + validateSinglePartitionForCreate(requestPartitionId); + validateRequestPartition(requestPartitionId, theResourceType); + + ourLog.info("Create with partition: {}", requestPartitionId); + + return validateAndNormalizePartition(requestPartitionId, requestDetails, theResourceType); } private boolean systemRequestHasExplicitPartition(@Nonnull SystemRequestDetails theRequest) { @@ -288,7 +302,7 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition * If the partition has both, they are validated to ensure that they correspond. */ @Nonnull - private RequestPartitionId validateNormalizeAndNotifyHooksForRead( + private RequestPartitionId validateAndNormalizePartition( @Nonnull RequestPartitionId theRequestPartitionId, RequestDetails theRequest, @Nullable String theResourceType) { @@ -330,25 +344,31 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition @Override public boolean isResourcePartitionable(String theResourceType) { - return !myNonPartitionableResourceNames.contains(theResourceType); + return theResourceType != null && !myNonPartitionableResourceNames.contains(theResourceType); } - private void validateSinglePartitionForCreate( - RequestPartitionId theRequestPartitionId, @Nonnull String theResourceName, Pointcut thePointcut) { - validateRequestPartitionNotNull(theRequestPartitionId, thePointcut); + private boolean isResourceNonPartitionable(String theResourceType) { + return theResourceType != null && !isResourcePartitionable(theResourceType); + } + private void validateSinglePartitionForCreate(RequestPartitionId theRequestPartitionId) { if (theRequestPartitionId.hasPartitionIds()) { validateSinglePartitionIdOrNameForCreate(theRequestPartitionId.getPartitionIds()); } validateSinglePartitionIdOrNameForCreate(theRequestPartitionId.getPartitionNames()); + } + private void validateRequestPartition(RequestPartitionId theRequestPartitionId, String theResourceName) { // Make sure we're not using one of the conformance resources in a non-default partition + if (theRequestPartitionId.isDefaultPartition() || theRequestPartitionId.isAllPartitions()) { + return; + } if ((theRequestPartitionId.hasPartitionIds() && !theRequestPartitionId.getPartitionIds().contains(null)) || (theRequestPartitionId.hasPartitionNames() && !theRequestPartitionId.getPartitionNames().contains(JpaConstants.DEFAULT_PARTITION_NAME))) { - if (!isResourcePartitionable(theResourceName)) { + if (isResourceNonPartitionable(theResourceName)) { String msg = myFhirContext .getLocalizer() .getMessageSanitized( @@ -360,10 +380,10 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition } } - private void validateRequestPartitionNotNull(RequestPartitionId theRequestPartitionId, Pointcut theThePointcut) { + private void validateRequestPartitionNotNull(RequestPartitionId theRequestPartitionId, Pointcut... thePointcuts) { if (theRequestPartitionId == null) { throw new InternalErrorException( - Msg.code(1319) + "No interceptor provided a value for pointcut: " + theThePointcut); + Msg.code(1319) + "No interceptor provided a value for pointcuts: " + Arrays.toString(thePointcuts)); } }