Add support for running the reindex job across resources in multiple partitions and all partitions (#6009)

* Cleanup and fix compile warnings

* Cleanup and enhance batch2 jobs to support running them across multiple partitions.

* Add some documentation and fix build in the pipeline.

* Fix tests

* Add support for RequestPartitionId.allPartitions as input to the IJobPartitionProvider. Add tests and fix one test. Add missing changelog.

* Separate basic partition provider logic so it can be reused by Mongo. Update documentation.

* Revert change to apply certain validation only for creates.

* Remove added logs

* spotless fix

* Address code review comments
This commit is contained in:
Martha Mitran 2024-07-05 22:23:07 -07:00 committed by GitHub
parent 5e519810ff
commit ecef72729f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
62 changed files with 1194 additions and 621 deletions

View File

@ -204,8 +204,6 @@ ca.uhn.fhir.jpa.partition.PartitionLookupSvcImpl.invalidName=Partition name "{0}
ca.uhn.fhir.jpa.partition.PartitionLookupSvcImpl.cantCreateDuplicatePartitionName=Partition name "{0}" is already defined
ca.uhn.fhir.jpa.partition.PartitionLookupSvcImpl.cantCreateDefaultPartition=Can not create partition with name "DEFAULT"
ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInterceptor.unknownTenantName=Unknown tenant: {0}
ca.uhn.fhir.jpa.dao.HistoryBuilder.noSystemOrTypeHistoryForPartitionAwareServer=Type- and Server- level history operation not supported across partitions on partitioned server
ca.uhn.fhir.jpa.provider.DiffProvider.cantDiffDifferentTypes=Unable to diff two resources of different types

View File

@ -0,0 +1,7 @@
---
type: add
issue: 6008
title: "Previously, when partitioning is enabled, the reindex job could only be run against one partition.
The reindex job parameters can now accept a list of partitions or all partitions (list of RequestPartitionId or RequestPartitionId.allPartitions).
In the future, the functionality can be extended to other bulk operations run via batch2 jobs (e.g. $delete-expunge, $export).
"

View File

@ -130,14 +130,14 @@ Once enabled, HTTP Requests to the FHIR server must include the name of the part
POST www.example.com/fhir/Patient
```
With partitioning enabled, if we were to now create a patient in the `DEFAULT` paritition, the request would now look like this:
With partitioning enabled, if we were to now create a patient in the `P1` partition, the request would now look like this:
```
POST www.example.com/fhir/DEFAULT/Patient
POST www.example.com/fhir/P1/Patient
```
Failure to add a partition name to the request path will result in an error when multitenancy is enabled.
If a tenant name is not provided in the request path, the request will default the tenant and use will use the 'DEFAULT' partition.
# Limitations

View File

@ -19,6 +19,7 @@
*/
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.config.BaseBatch2Config;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
@ -27,6 +28,8 @@ import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import jakarta.persistence.EntityManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -52,4 +55,10 @@ public class JpaBatch2Config extends BaseBatch2Config {
theEntityManager,
theInterceptorBroadcaster);
}
@Bean
public IJobPartitionProvider jobPartitionProvider(
IRequestPartitionHelperSvc theRequestPartitionHelperSvc, IPartitionLookupSvc thePartitionLookupSvc) {
return new JpaJobPartitionProvider(theRequestPartitionHelperSvc, thePartitionLookupSvc);
}
}

View File

@ -0,0 +1,44 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import java.util.List;
import java.util.stream.Collectors;
/**
* The default JPA implementation, which uses {@link IRequestPartitionHelperSvc} and {@link IPartitionLookupSvc}
* to compute the partition to run a batch2 job.
* The latter will be used to handle cases when the job is configured to run against all partitions
* (bulk system operation) and will return the actual list with all the configured partitions.
*/
public class JpaJobPartitionProvider implements IJobPartitionProvider {
protected final IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
private final IPartitionLookupSvc myPartitionLookupSvc;
public JpaJobPartitionProvider(
IRequestPartitionHelperSvc theRequestPartitionHelperSvc, IPartitionLookupSvc thePartitionLookupSvc) {
myRequestPartitionHelperSvc = theRequestPartitionHelperSvc;
myPartitionLookupSvc = thePartitionLookupSvc;
}
@Override
public List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation) {
RequestPartitionId partitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(
theRequestDetails, theOperation);
if (!partitionId.isAllPartitions()) {
return List.of(partitionId);
}
// handle (bulk) system operations that are typically configured with RequestPartitionId.allPartitions()
// populate the actual list of all partitions
List<RequestPartitionId> partitionIdList = myPartitionLookupSvc.listPartitions().stream()
.map(PartitionEntity::toRequestPartitionId)
.collect(Collectors.toList());
partitionIdList.add(RequestPartitionId.defaultPartition());
return partitionIdList;
}
}

View File

@ -0,0 +1,76 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class JpaJobPartitionProviderTest {
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Mock
private IPartitionLookupSvc myPartitionLookupSvc;
@InjectMocks
private JpaJobPartitionProvider myJobPartitionProvider;
@Test
public void getPartitions_requestSpecificPartition_returnsPartition() {
// setup
SystemRequestDetails requestDetails = new SystemRequestDetails();
String operation = ProviderConstants.OPERATION_EXPORT;
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(ArgumentMatchers.eq(requestDetails), ArgumentMatchers.eq(operation))).thenReturn(partitionId);
// test
List <RequestPartitionId> partitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation);
// verify
Assertions.assertThat(partitionIds).hasSize(1);
Assertions.assertThat(partitionIds).containsExactlyInAnyOrder(partitionId);
}
@Test
public void getPartitions_requestAllPartitions_returnsListOfAllSpecificPartitions() {
// setup
SystemRequestDetails requestDetails = new SystemRequestDetails();
String operation = ProviderConstants.OPERATION_EXPORT;
when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(ArgumentMatchers.eq(requestDetails), ArgumentMatchers.eq(operation)))
.thenReturn( RequestPartitionId.allPartitions());
List<RequestPartitionId> partitionIds = List.of(RequestPartitionId.fromPartitionIds(1), RequestPartitionId.fromPartitionIds(2));
List<PartitionEntity> partitionEntities = new ArrayList<>();
partitionIds.forEach(partitionId -> {
PartitionEntity entity = mock(PartitionEntity.class);
when(entity.toRequestPartitionId()).thenReturn(partitionId);
partitionEntities.add(entity);
});
when(myPartitionLookupSvc.listPartitions()).thenReturn(partitionEntities);
List<RequestPartitionId> expectedPartitionIds = new ArrayList<>(partitionIds);
expectedPartitionIds.add(RequestPartitionId.defaultPartition());
// test
List<RequestPartitionId> actualPartitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation);
// verify
Assertions.assertThat(actualPartitionIds).hasSize(expectedPartitionIds.size());
Assertions.assertThat(actualPartitionIds).containsExactlyInAnyOrder(expectedPartitionIds.toArray(new RequestPartitionId[0]));
}
}

View File

@ -1,8 +1,5 @@
package ca.uhn.fhir.jpa.bulk.imprt2;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.jobs.imprt.ConsumeFilesStep;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
@ -24,6 +21,9 @@ import java.util.List;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@TestMethodOrder(MethodOrderer.MethodName.class)
@ -141,15 +141,17 @@ public class ConsumeFilesStepR4Test extends BasePartitioningR4Test {
// Validate
if (partitionEnabled) {
assertEquals(7, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
} else {
assertEquals(6, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
}
int expectedSelectQueryCount = partitionEnabled ? 8 : 6;
assertEquals(expectedSelectQueryCount, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
assertEquals(2, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
assertEquals(4, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(1, myCaptureQueriesListener.countCommits());
// PartitionLookupSvcImpl#lookupPartitionByName generates one additional commit
// because it executes in a transaction (calls executeInTransaction)
// we may want to change that in the future
int expectedCommitCount = partitionEnabled ? 2 : 1;
assertEquals(expectedCommitCount, myCaptureQueriesListener.countCommits());
assertEquals(0, myCaptureQueriesListener.countRollbacks());
patient = myPatientDao.read(new IdType("Patient/A"), mySrd);

View File

@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
@ -277,7 +278,8 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest {
}
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ)
public RequestPartitionId partitionIdentifyRead(ServletRequestDetails theRequestDetails) {
public RequestPartitionId partitionIdentifyRead(ServletRequestDetails theRequestDetails,
ReadPartitionIdRequestDetails theDetails) {
// Just to be nice, figure out the first line in the stack that isn't a part of the
// partitioning or interceptor infrastructure, just so it's obvious who is asking

View File

@ -15,6 +15,8 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
@ -40,9 +42,10 @@ public class ReindexStepTest {
public void testMethodReindex_withRequestPartitionId_willExecuteWithPartitionId(){
// given
Integer expectedPartitionId = 1;
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson();
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(expectedPartitionId);
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(List.of(), partitionId);
ReindexJobParameters reindexJobParameters = new ReindexJobParameters();
reindexJobParameters.setRequestPartitionId(RequestPartitionId.fromPartitionId(expectedPartitionId));
reindexJobParameters.setRequestPartitionId(partitionId);
when(myHapiTransactionService.withRequest(any())).thenCallRealMethod();
when(myHapiTransactionService.buildExecutionBuilder(any())).thenCallRealMethod();

View File

@ -20,8 +20,8 @@ import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class DeleteExpungeJobTest extends BaseJpaR4Test {
@Autowired
@ -62,7 +62,8 @@ public class DeleteExpungeJobTest extends BaseJpaR4Test {
assertEquals(2, myDiagnosticReportDao.search(SearchParameterMap.newSynchronous()).size());
DeleteExpungeJobParameters jobParameters = new DeleteExpungeJobParameters();
jobParameters.addUrl("Observation?subject.active=false").addUrl("DiagnosticReport?subject.active=false");
jobParameters.addUrl("Observation?subject.active=false");
jobParameters.addUrl("DiagnosticReport?subject.active=false");
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setParameters(jobParameters);

View File

@ -311,6 +311,29 @@ public class ReindexJobTest extends BaseJpaR4Test {
myStorageSettings.setMarkResourcesForReindexingUponSearchParameterChange(reindexPropertyCache);
}
@Test
public void testReindex_byMultipleUrls_indexesMatchingResources() {
// setup
createObservation(withStatus(Observation.ObservationStatus.FINAL.toCode()));
createObservation(withStatus(Observation.ObservationStatus.CANCELLED.toCode()));
createPatient(withActiveTrue());
createPatient(withActiveFalse());
// Only reindex one of them
ReindexJobParameters parameters = new ReindexJobParameters();
parameters.addUrl("Observation?status=final");
parameters.addUrl("Patient?");
// execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(startRequest);
JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res);
assertThat(jobInstance.getCombinedRecordsProcessed()).isEqualTo(3);
}
@Test
public void testReindexDeletedResources_byUrl_willRemoveDeletedResourceEntriesFromIndexTables(){
IIdType obsId = myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.FINAL);

View File

@ -0,0 +1,116 @@
package ca.uhn.fhir.jpa.delete.job;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInterceptor;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ReindexJobWithPartitioningTest extends BaseJpaR4Test {
@Autowired
private IJobCoordinator myJobCoordinator;
private final RequestTenantPartitionInterceptor myPartitionInterceptor = new RequestTenantPartitionInterceptor();
@BeforeEach
public void before() {
myInterceptorRegistry.registerInterceptor(myPartitionInterceptor);
myPartitionSettings.setPartitioningEnabled(true);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(1).setName("TestPartition1"), null);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(2).setName("TestPartition2"), null);
RequestPartitionId partition1 = RequestPartitionId.fromPartitionId(1);
RequestPartitionId partition2 = RequestPartitionId.fromPartitionId(2);
RequestPartitionId defaultPartition = RequestPartitionId.defaultPartition();
Observation observation1 = buildResource("Observation", withStatus(Observation.ObservationStatus.FINAL.toCode()));
myObservationDao.create(observation1, new SystemRequestDetails().setRequestPartitionId(partition1));
Observation observation2 = buildResource("Observation", withStatus(Observation.ObservationStatus.REGISTERED.toCode()));
myObservationDao.create(observation2, new SystemRequestDetails().setRequestPartitionId(partition1));
Observation observation3 = buildResource("Observation", withStatus(Observation.ObservationStatus.FINAL.toCode()));
myObservationDao.create(observation3, new SystemRequestDetails().setRequestPartitionId(partition2));
Patient patient1 = buildResource("Patient", withActiveTrue());
myPatientDao.create(patient1, new SystemRequestDetails().setRequestPartitionId(partition1));
Patient patient2 = buildResource("Patient", withActiveFalse());
myPatientDao.create(patient2, new SystemRequestDetails().setRequestPartitionId(partition2));
Patient patient3 = buildResource("Patient", withActiveFalse());
myPatientDao.create(patient3, new SystemRequestDetails().setRequestPartitionId(defaultPartition));
}
@AfterEach
public void after() {
myInterceptorRegistry.unregisterInterceptor(myPartitionInterceptor);
myPartitionSettings.setPartitioningEnabled(new PartitionSettings().isPartitioningEnabled());
}
public static Stream<Arguments> getReindexParameters() {
List<RequestPartitionId> twoPartitions = List.of(RequestPartitionId.fromPartitionId(1), RequestPartitionId.fromPartitionId(2));
List<RequestPartitionId> partition1 = List.of(RequestPartitionId.fromPartitionId(1));
List<RequestPartitionId> allPartitions = List.of(RequestPartitionId.allPartitions());
return Stream.of(
// includes all resources from all partitions - partition 1, partition 2 and default partition
Arguments.of(List.of(), List.of(), false, 6),
// includes all Observations
Arguments.of(List.of("Observation?"), twoPartitions, false, 3),
// includes all Observations
Arguments.of(List.of("Observation?"), allPartitions, false, 3),
Arguments.of(List.of("Observation?"), List.of(), false, 0),
// includes Observations in partition 1
Arguments.of(List.of("Observation?"), partition1, true, 2),
// includes all Patients from all partitions - partition 1, partition 2 and default partition
Arguments.of(List.of("Patient?"), allPartitions, false, 3),
// includes Patients and Observations in partitions 1 and 2
Arguments.of(List.of("Observation?", "Patient?"), twoPartitions, false, 5),
// includes Observations from partition 1 and Patients from partition 2
Arguments.of(List.of("Observation?", "Patient?"), twoPartitions, true, 3),
// includes final Observations and Patients from partitions 1 and 2
Arguments.of(List.of("Observation?status=final", "Patient?"), twoPartitions, false, 4),
// includes final Observations from partition 1 and Patients from partition 2
Arguments.of(List.of("Observation?status=final", "Patient?"), twoPartitions, true, 2),
// includes final Observations and Patients from partitions 1
Arguments.of(List.of("Observation?status=final", "Patient?"), partition1, false, 2)
);
}
@ParameterizedTest
@MethodSource(value = "getReindexParameters")
public void testReindex_byMultipleUrlsAndPartitions_indexesMatchingResources(List<String> theUrls,
List<RequestPartitionId> thePartitions,
boolean theShouldAssignPartitionToUrl,
int theExpectedIndexedResourceCount) {
JobParameters parameters = JobParameters.from(theUrls, thePartitions, theShouldAssignPartitionToUrl);
// execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res);
// verify only resources matching URLs and partitions provided via parameters were re-indexed
assertThat(jobInstance.getCombinedRecordsProcessed()).isEqualTo(theExpectedIndexedResourceCount);
}
}

View File

@ -200,7 +200,7 @@ public class PartitioningInterceptorR4Test extends BaseJpaR4SystemTest {
myPatientDao.search(map);
fail();
} catch (InternalErrorException e) {
assertEquals(Msg.code(1319) + "No interceptor provided a value for pointcut: STORAGE_PARTITION_IDENTIFY_READ", e.getMessage());
assertEquals(Msg.code(1319) + "No interceptor provided a value for pointcuts: [STORAGE_PARTITION_IDENTIFY_ANY, STORAGE_PARTITION_IDENTIFY_READ]", e.getMessage());
}
}

View File

@ -27,6 +27,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import ca.uhn.fhir.rest.server.tenant.ITenantIdentificationStrategy;
import jakarta.annotation.Nonnull;
@ -57,6 +58,7 @@ public class RequestTenantPartitionInterceptor {
// We will use the tenant ID that came from the request as the partition name
String tenantId = theRequestDetails.getTenantId();
if (isBlank(tenantId)) {
// this branch is no-op happen when "partitioning.tenant_identification_strategy" is set to URL_BASED
if (theRequestDetails instanceof SystemRequestDetails) {
SystemRequestDetails requestDetails = (SystemRequestDetails) theRequestDetails;
if (requestDetails.getRequestPartitionId() != null) {
@ -67,6 +69,11 @@ public class RequestTenantPartitionInterceptor {
throw new InternalErrorException(Msg.code(343) + "No partition ID has been specified");
}
// for REQUEST_TENANT partition selection mode, allPartitions is supported when URL includes _ALL as the tenant
// else if no tenant is provided in the URL, DEFAULT will be used as per UrlBaseTenantIdentificationStrategy
if (tenantId.equals(ProviderConstants.ALL_PARTITIONS_TENANT_NAME)) {
return RequestPartitionId.allPartitions();
}
return RequestPartitionId.fromPartitionName(tenantId);
}
}

View File

@ -53,6 +53,7 @@ public class ProviderConstants {
public static final String PARTITION_MANAGEMENT_PARTITION_DESC = "description";
public static final String DEFAULT_PARTITION_NAME = "DEFAULT";
public static final String ALL_PARTITIONS_TENANT_NAME = "_ALL";
/**
* Operation name: diff
@ -212,6 +213,7 @@ public class ProviderConstants {
/**
* Whether all resource types should be reindexed
*/
@Deprecated(since = "7.3.4")
public static final String OPERATION_REINDEX_PARAM_EVERYTHING = "everything";
/**

View File

@ -19,7 +19,7 @@
*/
package ca.uhn.fhir.batch2.jobs.expunge;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
@ -45,8 +45,8 @@ public class DeleteExpungeAppCtx {
public JobDefinition<DeleteExpungeJobParameters> expungeJobDefinition(
IBatch2DaoSvc theBatch2DaoSvc,
HapiTransactionService theHapiTransactionService,
IDeleteExpungeSvc theDeleteExpungeSvc,
IIdHelperService theIdHelperService,
IDeleteExpungeSvc<?> theDeleteExpungeSvc,
IIdHelperService<?> theIdHelperService,
IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
return JobDefinition.newBuilder()
.setJobDefinitionId(JOB_DELETE_EXPUNGE)
@ -59,13 +59,13 @@ public class DeleteExpungeAppCtx {
.addFirstStep(
"generate-ranges",
"Generate data ranges to expunge",
PartitionedUrlChunkRangeJson.class,
ChunkRangeJson.class,
expungeGenerateRangeChunksStep())
.addIntermediateStep(
"load-ids",
"Load IDs of resources to expunge",
ResourceIdListWorkChunkJson.class,
new LoadIdsStep(theBatch2DaoSvc))
loadIdsStep(theBatch2DaoSvc))
.addLastStep(
"expunge",
"Perform the resource expunge",
@ -76,7 +76,7 @@ public class DeleteExpungeAppCtx {
@Bean
public DeleteExpungeJobParametersValidator expungeJobParametersValidator(
IBatch2DaoSvc theBatch2DaoSvc,
IDeleteExpungeSvc theDeleteExpungeSvc,
IDeleteExpungeSvc<?> theDeleteExpungeSvc,
IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
return new DeleteExpungeJobParametersValidator(
new UrlListValidator(ProviderConstants.OPERATION_EXPUNGE, theBatch2DaoSvc),
@ -84,17 +84,22 @@ public class DeleteExpungeAppCtx {
theRequestPartitionHelperSvc);
}
@Bean
public LoadIdsStep<DeleteExpungeJobParameters> loadIdsStep(IBatch2DaoSvc theBatch2DaoSvc) {
return new LoadIdsStep<>(theBatch2DaoSvc);
}
@Bean
public DeleteExpungeStep expungeStep(
HapiTransactionService theHapiTransactionService,
IDeleteExpungeSvc theDeleteExpungeSvc,
IIdHelperService theIdHelperService) {
IDeleteExpungeSvc<?> theDeleteExpungeSvc,
IIdHelperService<?> theIdHelperService) {
return new DeleteExpungeStep(theHapiTransactionService, theDeleteExpungeSvc, theIdHelperService);
}
@Bean
public GenerateRangeChunksStep expungeGenerateRangeChunksStep() {
return new GenerateRangeChunksStep();
public GenerateRangeChunksStep<DeleteExpungeJobParameters> expungeGenerateRangeChunksStep() {
return new GenerateRangeChunksStep<>();
}
@Bean

View File

@ -19,10 +19,10 @@
*/
package ca.uhn.fhir.batch2.jobs.expunge;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DeleteExpungeJobParameters extends PartitionedUrlListJobParameters {
public class DeleteExpungeJobParameters extends JobParameters {
@JsonProperty("cascade")
private boolean myCascade;

View File

@ -159,10 +159,8 @@ public class BulkDataImportProvider {
RequestPartitionId partitionId =
myRequestPartitionHelperService.determineReadPartitionForRequestForServerOperation(
theRequestDetails, JpaConstants.OPERATION_IMPORT);
if (!partitionId.isAllPartitions()) {
myRequestPartitionHelperService.validateHasPartitionPermissions(theRequestDetails, "Binary", partitionId);
jobParameters.setPartitionId(partitionId);
}
myRequestPartitionHelperService.validateHasPartitionPermissions(theRequestDetails, "Binary", partitionId);
jobParameters.setPartitionId(partitionId);
// Extract all the URLs and order them in the order that is least
// likely to result in conflict (e.g. Patients before Observations

View File

@ -32,7 +32,7 @@ public class BulkImportAppCtx {
public static final int PARAM_MAXIMUM_BATCH_SIZE_DEFAULT = 800; // Avoid the 1000 SQL param limit
@Bean
public JobDefinition bulkImport2JobDefinition() {
public JobDefinition<BulkImportJobParameters> bulkImport2JobDefinition() {
return JobDefinition.newBuilder()
.setJobDefinitionId(JOB_BULK_IMPORT_PULL)
.setJobDescription("FHIR Bulk Import using pull-based data source")

View File

@ -20,7 +20,10 @@
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
@ -29,7 +32,6 @@ import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -51,20 +53,26 @@ public class ReindexAppCtx {
.addFirstStep(
"generate-ranges",
"Generate data ranges to reindex",
PartitionedUrlChunkRangeJson.class,
ChunkRangeJson.class,
reindexGenerateRangeChunksStep())
.addIntermediateStep(
"load-ids",
"Load IDs of resources to reindex",
ResourceIdListWorkChunkJson.class,
new LoadIdsStep(theBatch2DaoSvc))
loadIdsStep(theBatch2DaoSvc))
.addLastStep("reindex", "Perform the resource reindex", reindexStep())
.build();
}
@Bean
public GenerateRangeChunksStep reindexGenerateRangeChunksStep() {
return new ReindexGenerateRangeChunksStep();
public IJobStepWorker<ReindexJobParameters, VoidModel, ChunkRangeJson> reindexGenerateRangeChunksStep() {
return new GenerateRangeChunksStep<>();
}
@Bean
public IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> loadIdsStep(
IBatch2DaoSvc theBatch2DaoSvc) {
return new LoadIdsStep<>(theBatch2DaoSvc);
}
@Bean
@ -82,8 +90,8 @@ public class ReindexAppCtx {
public ReindexProvider reindexProvider(
FhirContext theFhirContext,
IJobCoordinator theJobCoordinator,
IRequestPartitionHelperSvc theRequestPartitionHelperSvc,
IJobPartitionProvider theJobPartitionHandler,
UrlPartitioner theUrlPartitioner) {
return new ReindexProvider(theFhirContext, theJobCoordinator, theRequestPartitionHelperSvc, theUrlPartitioner);
return new ReindexProvider(theFhirContext, theJobCoordinator, theJobPartitionHandler, theUrlPartitioner);
}
}

View File

@ -1,51 +0,0 @@
/*-
* #%L
* hapi-fhir-storage-batch2-jobs
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReindexGenerateRangeChunksStep extends GenerateRangeChunksStep<ReindexJobParameters> {
private static final Logger ourLog = LoggerFactory.getLogger(ReindexGenerateRangeChunksStep.class);
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<ReindexJobParameters, VoidModel> theStepExecutionDetails,
@Nonnull IJobDataSink<PartitionedUrlChunkRangeJson> theDataSink)
throws JobExecutionFailedException {
ReindexJobParameters parameters = theStepExecutionDetails.getParameters();
ourLog.info(
"Beginning reindex job - OptimizeStorage[{}] - ReindexSearchParameters[{}]",
parameters.getOptimizeStorage(),
parameters.getReindexSearchParameters());
return super.run(theStepExecutionDetails, theDataSink);
}
}

View File

@ -19,14 +19,14 @@
*/
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class ReindexJobParameters extends PartitionedUrlListJobParameters {
public class ReindexJobParameters extends JobParameters {
public static final String OPTIMIZE_STORAGE = "optimizeStorage";
public static final String REINDEX_SEARCH_PARAMETERS = "reindexSearchParameters";

View File

@ -20,13 +20,12 @@
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.model.api.annotation.Description;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
@ -50,7 +49,7 @@ public class ReindexProvider {
private final FhirContext myFhirContext;
private final IJobCoordinator myJobCoordinator;
private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
private final IJobPartitionProvider myJobPartitionProvider;
private final UrlPartitioner myUrlPartitioner;
/**
@ -59,11 +58,11 @@ public class ReindexProvider {
public ReindexProvider(
FhirContext theFhirContext,
IJobCoordinator theJobCoordinator,
IRequestPartitionHelperSvc theRequestPartitionHelperSvc,
IJobPartitionProvider theJobPartitionProvider,
UrlPartitioner theUrlPartitioner) {
myFhirContext = theFhirContext;
myJobCoordinator = theJobCoordinator;
myRequestPartitionHelperSvc = theRequestPartitionHelperSvc;
myJobPartitionProvider = theJobPartitionProvider;
myUrlPartitioner = theUrlPartitioner;
}
@ -128,10 +127,9 @@ public class ReindexProvider {
.forEach(params::addPartitionedUrl);
}
RequestPartitionId requestPartition =
myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(
theRequestDetails, ProviderConstants.OPERATION_REINDEX);
params.setRequestPartitionId(requestPartition);
myJobPartitionProvider
.getPartitions(theRequestDetails, ProviderConstants.OPERATION_REINDEX)
.forEach(params::addRequestPartitionId);
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);

View File

@ -103,7 +103,7 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Resourc
myHapiTransactionService
.withRequest(requestDetails)
.withTransactionDetails(transactionDetails)
.withRequestPartitionId(theJobParameters.getRequestPartitionId())
.withRequestPartitionId(data.getRequestPartitionId())
.execute(reindexJob);
return new RunOutcome(data.size());

View File

@ -37,8 +37,6 @@ public class ReindexJobParametersValidatorTest {
parameters.addUrl(theUrl);
// test
List<String> errors = myValidator.validate(null, parameters);
return errors;
return myValidator.validate(null, parameters);
}
}

View File

@ -1,9 +1,7 @@
package ca.uhn.fhir.batch2.jobs.reindex;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
@ -33,9 +31,15 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNotNull;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -59,6 +63,8 @@ public class ReindexProviderTest {
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Mock
private UrlPartitioner myUrlPartitioner;
@Mock
private IJobPartitionProvider myJobPartitionProvider;
@Captor
private ArgumentCaptor<JobInstanceStartRequest> myStartRequestCaptor;
@ -72,7 +78,7 @@ public class ReindexProviderTest {
when(myJobCoordinator.startInstance(isNotNull(), any()))
.thenReturn(createJobStartResponse());
when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(any(), any())).thenReturn(RequestPartitionId.allPartitions());
when(myJobPartitionProvider.getPartitions(any(), any())).thenReturn(List.of(RequestPartitionId.allPartitions()));
}
private Batch2JobStartResponse createJobStartResponse() {
@ -96,11 +102,7 @@ public class ReindexProviderTest {
input.addParameter(ProviderConstants.OPERATION_REINDEX_PARAM_BATCH_SIZE, new DecimalType(batchSize));
ourLog.debug(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
PartitionedUrl partitionedUrl = new PartitionedUrl();
partitionedUrl.setUrl(url);
partitionedUrl.setRequestPartitionId(RequestPartitionId.defaultPartition());
when(myUrlPartitioner.partitionUrl(anyString(), any())).thenReturn(partitionedUrl);
when(myUrlPartitioner.partitionUrl(anyString(), any())).thenReturn(new PartitionedUrl().setUrl(url).setRequestPartitionId(RequestPartitionId.defaultPartition()));
// Execute

View File

@ -0,0 +1,25 @@
package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import java.util.List;
/**
* Provides the list of partitions that a job should run against.
* TODO MM: Consider moving UrlPartitioner calls to this class once other batch operations need to support running
* across all partitions on a multitenant FHIR server.
* That way all partitioning related logic exists only here for batch jobs.
* After that PartitionedUrl#myRequestPartitionId can be marked as deprecated.
*/
public interface IJobPartitionProvider {
/**
* Provides the list of partitions to run job steps against, based on the request that initiates the job.
* @param theRequestDetails the requestDetails
* @param theOperation the operation being run which corresponds to the job
* @return the list of partitions
*/
List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation);
// List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation, String theUrls);
}

View File

@ -21,18 +21,21 @@ package ca.uhn.fhir.batch2.config;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.ReductionStepExecutorServiceImpl;
import ca.uhn.fhir.batch2.coordinator.SimpleJobPartitionProvider;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
@ -139,4 +142,9 @@ public abstract class BaseBatch2Config {
protected int getConcurrentConsumers() {
return 4;
}
@Bean
public IJobPartitionProvider jobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
return new SimpleJobPartitionProvider(theRequestPartitionHelperSvc);
}
}

View File

@ -0,0 +1,26 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import java.util.List;
/**
* Basic implementation which provides the partition list for a certain request which is composed of a single partition.
*/
public class SimpleJobPartitionProvider implements IJobPartitionProvider {
protected final IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
public SimpleJobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
myRequestPartitionHelperSvc = theRequestPartitionHelperSvc;
}
@Override
public List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation) {
RequestPartitionId partitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(
theRequestDetails, theOperation);
return List.of(partitionId);
}
}

View File

@ -19,6 +19,7 @@
*/
package ca.uhn.fhir.batch2.jobs.chunk;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.server.util.JsonDateDeserializer;
import ca.uhn.fhir.rest.server.util.JsonDateSerializer;
@ -26,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.Date;
@ -42,23 +44,61 @@ public class ChunkRangeJson implements IModelJson {
@Nonnull
private Date myEnd;
@Nullable
@JsonProperty("url")
private String myUrl;
@JsonProperty("resourceType")
private String myResourceType;
@Nullable
@JsonProperty("partitionId")
private RequestPartitionId myPartitionId;
public ChunkRangeJson() {}
public ChunkRangeJson(@Nonnull Date theStart, @Nonnull Date theEnd) {
this.myStart = theStart;
this.myEnd = theEnd;
}
@Nonnull
public Date getStart() {
return myStart;
}
public ChunkRangeJson setStart(@Nonnull Date theStart) {
myStart = theStart;
return this;
}
@Nonnull
public Date getEnd() {
return myEnd;
}
public ChunkRangeJson setEnd(@Nonnull Date theEnd) {
myEnd = theEnd;
@Nullable
public String getUrl() {
return myUrl;
}
public ChunkRangeJson setUrl(@Nullable String theUrl) {
myUrl = theUrl;
return this;
}
@Nonnull
public String getResourceType() {
return myResourceType;
}
public ChunkRangeJson setResourceType(@Nullable String theResourceType) {
myResourceType = theResourceType;
return this;
}
@Nullable
public RequestPartitionId getPartitionId() {
return myPartitionId;
}
public ChunkRangeJson setPartitionId(@Nullable RequestPartitionId thePartitionId) {
myPartitionId = thePartitionId;
return this;
}
}

View File

@ -1,39 +0,0 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.chunk;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;
public class PartitionedUrlChunkRangeJson extends ChunkRangeJson {
@Nullable
@JsonProperty("partitionedUrl")
private PartitionedUrl myPartitionedUrl;
@Nullable
public PartitionedUrl getPartitionedUrl() {
return myPartitionedUrl;
}
public void setPartitionedUrl(@Nullable PartitionedUrl thePartitionedUrl) {
myPartitionedUrl = thePartitionedUrl;
}
}

View File

@ -73,6 +73,7 @@ public class ResourceIdListWorkChunkJson implements IModelJson {
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("ids", myTypedPids)
.append("requestPartitionId", myRequestPartitionId)
.toString();
}

View File

@ -0,0 +1,116 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.parameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
/**
* Can be used to configure parameters for batch2 jobs.
* Please note that these need to be backward compatible as we do not have a way to migrate them to a different structure at the moment.
*/
public class JobParameters implements IModelJson {
@JsonProperty(value = "partitionId")
private List<RequestPartitionId> myRequestPartitionIds;
@JsonProperty("batchSize")
private Integer myBatchSize;
@JsonProperty("partitionedUrl")
private List<PartitionedUrl> myPartitionedUrls;
public void setRequestPartitionId(@Nullable RequestPartitionId theRequestPartitionId) {
if (theRequestPartitionId != null) {
myRequestPartitionIds = List.of(theRequestPartitionId);
}
}
@Nullable
public RequestPartitionId getRequestPartitionId() {
return getFirstRequestPartitionIdOrNull();
}
@Nullable
private RequestPartitionId getFirstRequestPartitionIdOrNull() {
return myRequestPartitionIds == null || myRequestPartitionIds.isEmpty() ? null : myRequestPartitionIds.get(0);
}
@Nonnull
public List<RequestPartitionId> getRequestPartitionIds() {
if (myRequestPartitionIds == null) {
myRequestPartitionIds = new ArrayList<>();
}
return myRequestPartitionIds;
}
public void addRequestPartitionId(RequestPartitionId theRequestPartitionId) {
getRequestPartitionIds().add(theRequestPartitionId);
}
public void setBatchSize(int theBatchSize) {
myBatchSize = theBatchSize;
}
@Nullable
public Integer getBatchSize() {
return myBatchSize;
}
public List<PartitionedUrl> getPartitionedUrls() {
if (myPartitionedUrls == null) {
myPartitionedUrls = new ArrayList<>();
}
return myPartitionedUrls;
}
public void addPartitionedUrl(@Nonnull PartitionedUrl theUrl) {
getPartitionedUrls().add(theUrl);
}
public void addUrl(@Nonnull String theUrl) {
getPartitionedUrls().add(new PartitionedUrl().setUrl(theUrl));
}
@VisibleForTesting
public static JobParameters from(
List<String> theUrls, List<RequestPartitionId> thePartitions, boolean theShouldAssignPartitionToUrl) {
JobParameters parameters = new JobParameters();
if (theShouldAssignPartitionToUrl) {
assert theUrls.size() == thePartitions.size();
for (int i = 0; i < theUrls.size(); i++) {
PartitionedUrl partitionedUrl = new PartitionedUrl();
partitionedUrl.setUrl(theUrls.get(i));
partitionedUrl.setRequestPartitionId(thePartitions.get(i));
parameters.addPartitionedUrl(partitionedUrl);
}
} else {
theUrls.forEach(url -> parameters.addPartitionedUrl(new PartitionedUrl().setUrl(url)));
thePartitions.forEach(parameters::addRequestPartitionId);
}
return parameters;
}
}

View File

@ -1,53 +0,0 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.parameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;
public class PartitionedJobParameters implements IModelJson {
@JsonProperty(value = "partitionId")
@Nullable
private RequestPartitionId myRequestPartitionId;
@JsonProperty("batchSize")
@Nullable
private Integer myBatchSize;
@Nullable
public RequestPartitionId getRequestPartitionId() {
return myRequestPartitionId;
}
public void setRequestPartitionId(@Nullable RequestPartitionId theRequestPartitionId) {
myRequestPartitionId = theRequestPartitionId;
}
public void setBatchSize(int theBatchSize) {
myBatchSize = theBatchSize;
}
@Nullable
public Integer getBatchSize() {
return myBatchSize;
}
}

View File

@ -48,15 +48,17 @@ public class PartitionedUrl implements IModelJson {
return myUrl;
}
public void setUrl(String theUrl) {
public PartitionedUrl setUrl(String theUrl) {
myUrl = theUrl;
return this;
}
public RequestPartitionId getRequestPartitionId() {
return myRequestPartitionId;
}
public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) {
public PartitionedUrl setRequestPartitionId(RequestPartitionId theRequestPartitionId) {
myRequestPartitionId = theRequestPartitionId;
return this;
}
}

View File

@ -1,53 +0,0 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.parameters;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.Validate;
import java.util.ArrayList;
import java.util.List;
public class PartitionedUrlListJobParameters extends PartitionedJobParameters {
@JsonProperty("partitionedUrl")
@Nullable
private List<PartitionedUrl> myPartitionedUrls;
public List<PartitionedUrl> getPartitionedUrls() {
if (myPartitionedUrls == null) {
myPartitionedUrls = new ArrayList<>();
}
return myPartitionedUrls;
}
public PartitionedUrlListJobParameters addPartitionedUrl(@Nonnull PartitionedUrl thePartitionedUrl) {
Validate.notNull(thePartitionedUrl);
getPartitionedUrls().add(thePartitionedUrl);
return this;
}
public PartitionedUrlListJobParameters addUrl(@Nonnull String theUrl) {
PartitionedUrl partitionedUrl = new PartitionedUrl();
partitionedUrl.setUrl(theUrl);
return addPartitionedUrl(partitionedUrl);
}
}

View File

@ -0,0 +1,50 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.util.Logs;
import org.slf4j.Logger;
public class ChunkProducer implements IIdChunkProducer<ChunkRangeJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final IBatch2DaoSvc myBatch2DaoSvc;
public ChunkProducer(IBatch2DaoSvc theBatch2DaoSvc) {
myBatch2DaoSvc = theBatch2DaoSvc;
}
@Override
public IResourcePidStream fetchResourceIdStream(ChunkRangeJson theData) {
String theUrl = theData.getUrl();
RequestPartitionId targetPartitionId = theData.getPartitionId();
ourLog.info(
"Fetching resource ID chunk in partition {} for URL {} - Range {} - {}",
targetPartitionId,
theUrl,
theData.getStart(),
theData.getEnd());
return myBatch2DaoSvc.fetchResourceIdStream(theData.getStart(), theData.getEnd(), targetPartitionId, theUrl);
}
}

View File

@ -25,49 +25,103 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.util.Logs;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.slf4j.Logger;
import org.thymeleaf.util.StringUtils;
import java.util.Date;
import java.util.List;
import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE;
public class GenerateRangeChunksStep<PT extends PartitionedUrlListJobParameters>
implements IFirstJobStepWorker<PT, PartitionedUrlChunkRangeJson> {
public class GenerateRangeChunksStep<PT extends JobParameters> implements IFirstJobStepWorker<PT, ChunkRangeJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<PT, VoidModel> theStepExecutionDetails,
@Nonnull IJobDataSink<PartitionedUrlChunkRangeJson> theDataSink)
@Nonnull IJobDataSink<ChunkRangeJson> theDataSink)
throws JobExecutionFailedException {
PT params = theStepExecutionDetails.getParameters();
Date start = BATCH_START_DATE;
Date end = new Date();
if (params.getPartitionedUrls().isEmpty()) {
ourLog.info("Searching for All Resources from {} to {}", start, end);
PartitionedUrlChunkRangeJson nextRange = new PartitionedUrlChunkRangeJson();
nextRange.setStart(start);
nextRange.setEnd(end);
theDataSink.accept(nextRange);
} else {
for (PartitionedUrl nextPartitionedUrl : params.getPartitionedUrls()) {
ourLog.info("Searching for [{}]] from {} to {}", nextPartitionedUrl, start, end);
PartitionedUrlChunkRangeJson nextRange = new PartitionedUrlChunkRangeJson();
nextRange.setPartitionedUrl(nextPartitionedUrl);
nextRange.setStart(start);
nextRange.setEnd(end);
theDataSink.accept(nextRange);
// there are partitions configured in either of the following lists, which are both optional
// the following code considers all use-cases
// the logic can be simplified once PartitionedUrl.myRequestPartitionId is deprecated
// @see IJobPartitionProvider
List<RequestPartitionId> partitionIds = params.getRequestPartitionIds();
List<PartitionedUrl> partitionedUrls = params.getPartitionedUrls();
if (partitionIds.isEmpty()) {
if (partitionedUrls.isEmpty()) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end);
sendChunk(chunkRangeJson, theDataSink);
return RunOutcome.SUCCESS;
}
partitionedUrls.forEach(partitionedUrl -> {
String url = partitionedUrl.getUrl();
RequestPartitionId partitionId = partitionedUrl.getRequestPartitionId();
ChunkRangeJson chunkRangeJson =
new ChunkRangeJson(start, end).setUrl(url).setPartitionId(partitionId);
sendChunk(chunkRangeJson, theDataSink);
});
return RunOutcome.SUCCESS;
}
partitionIds.forEach(partitionId -> {
if (partitionedUrls.isEmpty()) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end).setPartitionId(partitionId);
sendChunk(chunkRangeJson, theDataSink);
return;
}
partitionedUrls.forEach(partitionedUrl -> {
String url = partitionedUrl.getUrl();
RequestPartitionId urlPartitionId = partitionedUrl.getRequestPartitionId();
RequestPartitionId narrowPartitionId = determineNarrowPartitionId(partitionId, urlPartitionId);
ChunkRangeJson chunkRangeJson =
new ChunkRangeJson(start, end).setUrl(url).setPartitionId(narrowPartitionId);
sendChunk(chunkRangeJson, theDataSink);
});
});
return RunOutcome.SUCCESS;
}
private RequestPartitionId determineNarrowPartitionId(
@Nonnull RequestPartitionId theRequestPartitionId,
@Nullable RequestPartitionId theOtherRequestPartitionId) {
if (theOtherRequestPartitionId == null) {
return theRequestPartitionId;
}
if (theRequestPartitionId.isAllPartitions() && !theOtherRequestPartitionId.isAllPartitions()) {
return theOtherRequestPartitionId;
}
if (theRequestPartitionId.isDefaultPartition()
&& !theOtherRequestPartitionId.isDefaultPartition()
&& !theOtherRequestPartitionId.isAllPartitions()) {
return theOtherRequestPartitionId;
}
return theRequestPartitionId;
}
private void sendChunk(ChunkRangeJson theData, IJobDataSink<ChunkRangeJson> theDataSink) {
String url = theData.getUrl();
ourLog.info(
"Creating chunks for [{}] from {} to {} for partition {}",
!StringUtils.isEmpty(url) ? url : "everything",
theData.getStart(),
theData.getEnd(),
theData.getPartitionId());
theDataSink.accept(theData);
}
}

View File

@ -27,13 +27,18 @@ import jakarta.annotation.Nullable;
import java.util.Date;
/**
* A service that produces pages of resource pids based on the data provided by a previous batch step. Typically the
* A service that produces pages of resource pids based on the data provided by a previous batch step. Typically, the
* first step in a batch job produces work chunks that define what types of data the batch operation will be performing
* (e.g. a list of resource types and date ranges). This service is then used by the second step to actually query and
* page through resource pids based on the chunk definitions produced by the first step.
* @param <IT> This parameter defines constraints on the types of pids we are pulling (e.g. resource type, url, etc).
* @param <IT> This parameter defines constraints on the types of pids we are pulling (e.g. resource type, url, etc.).
*/
public interface IIdChunkProducer<IT extends ChunkRangeJson> {
IResourcePidStream fetchResourceIdStream(
Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, IT theData);
@Deprecated(since = "7.3.7")
default IResourcePidStream fetchResourceIdStream(
Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, IT theData) {
return fetchResourceIdStream(theData);
}
IResourcePidStream fetchResourceIdStream(IT theData);
}

View File

@ -24,35 +24,25 @@ import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
public class LoadIdsStep
implements IJobStepWorker<
PartitionedUrlListJobParameters, PartitionedUrlChunkRangeJson, ResourceIdListWorkChunkJson> {
private static final Logger ourLog = getLogger(LoadIdsStep.class);
private final ResourceIdListStep<PartitionedUrlListJobParameters, PartitionedUrlChunkRangeJson>
myResourceIdListStep;
public class LoadIdsStep<PT extends JobParameters>
implements IJobStepWorker<PT, ChunkRangeJson, ResourceIdListWorkChunkJson> {
private final ResourceIdListStep<PT> myResourceIdListStep;
public LoadIdsStep(IBatch2DaoSvc theBatch2DaoSvc) {
IIdChunkProducer<PartitionedUrlChunkRangeJson> idChunkProducer =
new PartitionedUrlListIdChunkProducer(theBatch2DaoSvc);
IIdChunkProducer<ChunkRangeJson> idChunkProducer = new ChunkProducer(theBatch2DaoSvc);
myResourceIdListStep = new ResourceIdListStep<>(idChunkProducer);
}
@Nonnull
@Override
public RunOutcome run(
@Nonnull
StepExecutionDetails<PartitionedUrlListJobParameters, PartitionedUrlChunkRangeJson>
theStepExecutionDetails,
@Nonnull StepExecutionDetails<PT, ChunkRangeJson> theStepExecutionDetails,
@Nonnull IJobDataSink<ResourceIdListWorkChunkJson> theDataSink)
throws JobExecutionFailedException {
return myResourceIdListStep.run(theStepExecutionDetails, theDataSink);

View File

@ -1,67 +0,0 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.util.Logs;
import jakarta.annotation.Nullable;
import org.slf4j.Logger;
import java.util.Date;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class PartitionedUrlListIdChunkProducer implements IIdChunkProducer<PartitionedUrlChunkRangeJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final IBatch2DaoSvc myBatch2DaoSvc;
public PartitionedUrlListIdChunkProducer(IBatch2DaoSvc theBatch2DaoSvc) {
myBatch2DaoSvc = theBatch2DaoSvc;
}
@Override
public IResourcePidStream fetchResourceIdStream(
Date theStart,
Date theEnd,
@Nullable RequestPartitionId theRequestPartitionId,
PartitionedUrlChunkRangeJson theData) {
PartitionedUrl partitionedUrl = theData.getPartitionedUrl();
RequestPartitionId targetPartitionId;
String theUrl;
if (partitionedUrl == null) {
theUrl = null;
targetPartitionId = theRequestPartitionId;
ourLog.info("Fetching resource ID chunk for everything - Range {} - {}", theStart, theEnd);
} else {
theUrl = partitionedUrl.getUrl();
targetPartitionId = defaultIfNull(partitionedUrl.getRequestPartitionId(), theRequestPartitionId);
ourLog.info(
"Fetching resource ID chunk for URL {} - Range {} - {}", partitionedUrl.getUrl(), theStart, theEnd);
}
return myBatch2DaoSvc.fetchResourceIdStream(theStart, theEnd, targetPartitionId, theUrl);
}
}

View File

@ -27,7 +27,7 @@ import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.util.Logs;
@ -42,40 +42,35 @@ import java.util.stream.Stream;
import static ca.uhn.fhir.util.StreamUtil.partition;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class ResourceIdListStep<PT extends PartitionedJobParameters, IT extends ChunkRangeJson>
implements IJobStepWorker<PT, IT, ResourceIdListWorkChunkJson> {
public class ResourceIdListStep<PT extends JobParameters>
implements IJobStepWorker<PT, ChunkRangeJson, ResourceIdListWorkChunkJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
public static final int DEFAULT_PAGE_SIZE = 20000;
protected static final int MAX_BATCH_OF_IDS = 500;
private final IIdChunkProducer<IT> myIdChunkProducer;
private final IIdChunkProducer<ChunkRangeJson> myIdChunkProducer;
public ResourceIdListStep(IIdChunkProducer<IT> theIdChunkProducer) {
public ResourceIdListStep(IIdChunkProducer<ChunkRangeJson> theIdChunkProducer) {
myIdChunkProducer = theIdChunkProducer;
}
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<PT, IT> theStepExecutionDetails,
@Nonnull StepExecutionDetails<PT, ChunkRangeJson> theStepExecutionDetails,
@Nonnull IJobDataSink<ResourceIdListWorkChunkJson> theDataSink)
throws JobExecutionFailedException {
IT data = theStepExecutionDetails.getData();
ChunkRangeJson data = theStepExecutionDetails.getData();
Date start = data.getStart();
Date end = data.getEnd();
Integer batchSize = theStepExecutionDetails.getParameters().getBatchSize();
ourLog.info("Beginning scan for reindex IDs in range {} to {}", start, end);
RequestPartitionId requestPartitionId =
theStepExecutionDetails.getParameters().getRequestPartitionId();
ourLog.info("Beginning to submit chunks in range {} to {}", start, end);
int chunkSize = Math.min(defaultIfNull(batchSize, MAX_BATCH_OF_IDS), MAX_BATCH_OF_IDS);
final IResourcePidStream searchResult = myIdChunkProducer.fetchResourceIdStream(
start, end, requestPartitionId, theStepExecutionDetails.getData());
final IResourcePidStream searchResult =
myIdChunkProducer.fetchResourceIdStream(theStepExecutionDetails.getData());
searchResult.visitStreamNoResult(typedResourcePidStream -> {
AtomicInteger totalIdsFound = new AtomicInteger();
@ -102,7 +97,7 @@ public class ResourceIdListStep<PT extends PartitionedJobParameters, IT extends
if (theTypedPids.isEmpty()) {
return;
}
ourLog.info("Submitting work chunk with {} IDs", theTypedPids.size());
ourLog.info("Submitting work chunk in partition {} with {} IDs", theRequestPartitionId, theTypedPids.size());
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(theTypedPids, theRequestPartitionId);
ourLog.debug("IDs are: {}", data);
theDataSink.accept(data);

View File

@ -256,7 +256,7 @@ public class JobDefinition<PT extends IModelJson> {
* Adds a processing step for this job.
*
* @param theStepId A unique identifier for this step. This only needs to be unique within the scope
* of the individual job definition (i.e. diuplicates are fine for different jobs, or
* of the individual job definition (i.e. duplicates are fine for different jobs, or
* even different versions of the same job)
* @param theStepDescription A description of this step
* @param theStepWorker The worker that will actually perform this step

View File

@ -82,6 +82,12 @@ public class JobInstanceStartRequest implements IModelJson {
myParameters = theParameters;
}
/**
* Sets the parameters for the job.
* Please note that these need to be backward compatible as we do not have a way to migrate them to a different structure at the moment.
* @param theParameters the parameters
* @return the current instance.
*/
public JobInstanceStartRequest setParameters(IModelJson theParameters) {
myParameters = JsonUtil.serializeWithSensitiveData(theParameters);
return this;

View File

@ -0,0 +1,43 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class SimpleJobPartitionProviderTest {
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@InjectMocks
private SimpleJobPartitionProvider myJobPartitionProvider;
@Test
public void getPartitions_requestSpecificPartition_returnsPartition() {
// setup
SystemRequestDetails requestDetails = new SystemRequestDetails();
String operation = ProviderConstants.OPERATION_EXPORT;
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(ArgumentMatchers.eq(requestDetails), ArgumentMatchers.eq(operation))).thenReturn(partitionId);
// test
List<RequestPartitionId> partitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation);
// verify
Assertions.assertThat(partitionIds).hasSize(1);
Assertions.assertThat(partitionIds).containsExactlyInAnyOrder(partitionId);
}
}

View File

@ -0,0 +1,53 @@
package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Date;
import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class ChunkProducerTest {
@Mock
private IBatch2DaoSvc myBatch2DaoSvc;
@InjectMocks
private ChunkProducer myChunkProducer;
@Test
public void fetchResourceIdStream_worksAsExpected() {
// setup
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
String url = "Patient?";
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(BATCH_START_DATE, new Date()).setPartitionId(partitionId).setUrl(url);
IResourcePidStream stream = mock(IResourcePidStream.class);
when(myBatch2DaoSvc.fetchResourceIdStream(any(), any(), any(), any())).thenReturn(stream);
// test
IResourcePidStream actualStream = myChunkProducer.fetchResourceIdStream(chunkRangeJson);
// verify
assertThat(actualStream).isSameAs(stream);
ArgumentCaptor<Date> dateCaptor = ArgumentCaptor.forClass(Date.class);
ArgumentCaptor<RequestPartitionId> partitionCaptor = ArgumentCaptor.forClass(RequestPartitionId.class);
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
verify(myBatch2DaoSvc).fetchResourceIdStream(dateCaptor.capture(), dateCaptor.capture(), partitionCaptor.capture(), urlCaptor.capture());
assertThat(dateCaptor.getAllValues()).containsExactly(chunkRangeJson.getStart(), chunkRangeJson.getEnd());
assertThat(partitionCaptor.getValue()).isEqualTo(partitionId);
assertThat(urlCaptor.getValue()).isEqualTo(url);
}
}

View File

@ -0,0 +1,140 @@
package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Stream;
import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class GenerateRangeChunksStepTest {
private final GenerateRangeChunksStep<JobParameters> myStep = new GenerateRangeChunksStep<>();
@Mock
private StepExecutionDetails<JobParameters, VoidModel> myStepExecutionDetails;
@Mock
private IJobDataSink<ChunkRangeJson> myJobDataSink;
private static final Date START = BATCH_START_DATE;
private static final Date END = new Date();
@BeforeEach
void setUp() {
}
@AfterEach
void tearDown() {
}
public static Stream<Arguments> getReindexParameters() {
List<RequestPartitionId> threePartitions = List.of(
RequestPartitionId.fromPartitionId(1),
RequestPartitionId.fromPartitionId(2),
RequestPartitionId.fromPartitionId(3)
);
List<RequestPartitionId> partition1 = List.of(RequestPartitionId.fromPartitionId(1));
// the actual values (URLs, partitionId) don't matter, but we add values similar to real hapi-fhir use-cases
return Stream.of(
Arguments.of(List.of(), List.of(), false, 1),
Arguments.of(List.of(), partition1, false, 1),
Arguments.of(List.of("Observation?"), threePartitions, false, 3),
Arguments.of(List.of("Observation?"), List.of(), false, 1),
Arguments.of(List.of("Observation?"), partition1, true, 1),
Arguments.of(List.of("Observation?", "Patient?"), threePartitions, false, 6),
Arguments.of(List.of("Observation?", "Patient?", "Practitioner?"), threePartitions, true, 3),
Arguments.of(List.of("Observation?status=final", "Patient?"), partition1, false, 2),
Arguments.of(List.of("Observation?status=final"), threePartitions, false, 3)
);
}
@ParameterizedTest
@MethodSource(value = "getReindexParameters")
public void run_withParameters_producesExpectedChunks(List<String> theUrls, List<RequestPartitionId> thePartitions,
boolean theShouldAssignPartitionToUrl, int theExpectedChunkCount) {
JobParameters parameters = JobParameters.from(theUrls, thePartitions, theShouldAssignPartitionToUrl);
when(myStepExecutionDetails.getParameters()).thenReturn(parameters);
myStep.run(myStepExecutionDetails, myJobDataSink);
ArgumentCaptor<ChunkRangeJson> captor = ArgumentCaptor.forClass(ChunkRangeJson.class);
verify(myJobDataSink, times(theExpectedChunkCount)).accept(captor.capture());
List<ChunkRangeJson> chunkRangeJsonList = getExpectedChunkList(theUrls, thePartitions, theShouldAssignPartitionToUrl, theExpectedChunkCount);
RequestPartitionId[] actualPartitionIds = captor.getAllValues().stream().map(ChunkRangeJson::getPartitionId).toList().toArray(new RequestPartitionId[0]);
RequestPartitionId[] expectedPartitionIds = chunkRangeJsonList.stream().map(ChunkRangeJson::getPartitionId).toList().toArray(new RequestPartitionId[0]);
assertThat(actualPartitionIds).containsExactlyInAnyOrder(expectedPartitionIds);
String[] actualUrls = captor.getAllValues().stream().map(ChunkRangeJson::getUrl).toList().toArray(new String[0]);
String[] expectedUrls = chunkRangeJsonList.stream().map(ChunkRangeJson::getUrl).toList().toArray(new String[0]);
assertThat(actualUrls).containsExactlyInAnyOrder(expectedUrls);
}
private List<ChunkRangeJson> getExpectedChunkList(List<String> theUrls, List<RequestPartitionId> thePartitions,
boolean theShouldAssignPartitionToUrl, int theExpectedChunkCount) {
List<ChunkRangeJson> chunkRangeJsonList = new ArrayList<>();
if (theShouldAssignPartitionToUrl) {
for (int i = 0; i < theExpectedChunkCount; i++) {
String url = theUrls.get(i);
RequestPartitionId partition = thePartitions.get(i);
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setUrl(url).setPartitionId(partition);
chunkRangeJsonList.add(chunkRangeJson);
}
return chunkRangeJsonList;
}
if (theUrls.isEmpty() && thePartitions.isEmpty()) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END);
chunkRangeJsonList.add(chunkRangeJson);
return chunkRangeJsonList;
}
if (theUrls.isEmpty()) {
for (RequestPartitionId partition : thePartitions) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setPartitionId(partition);
chunkRangeJsonList.add(chunkRangeJson);
}
return chunkRangeJsonList;
}
if (thePartitions.isEmpty()) {
for (String url : theUrls) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setUrl(url);
chunkRangeJsonList.add(chunkRangeJson);
}
return chunkRangeJsonList;
}
theUrls.forEach(url -> {
for (RequestPartitionId partition : thePartitions) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setUrl(url).setPartitionId(partition);
chunkRangeJsonList.add(chunkRangeJson);
}
});
return chunkRangeJsonList;
}
}

View File

@ -2,9 +2,9 @@ package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
@ -48,11 +48,11 @@ public class LoadIdsStepTest {
@Mock
private IJobDataSink<ResourceIdListWorkChunkJson> mySink;
private LoadIdsStep mySvc;
private LoadIdsStep<JobParameters> mySvc;
@BeforeEach
public void before() {
mySvc = new LoadIdsStep(myBatch2DaoSvc);
mySvc = new LoadIdsStep<>(myBatch2DaoSvc);
}
@Captor
@ -60,18 +60,16 @@ public class LoadIdsStepTest {
@Test
public void testGenerateSteps() {
PartitionedUrlListJobParameters parameters = new PartitionedUrlListJobParameters();
PartitionedUrlChunkRangeJson range = new PartitionedUrlChunkRangeJson();
range.setStart(DATE_1).setEnd(DATE_END);
JobParameters parameters = new JobParameters();
ChunkRangeJson range = new ChunkRangeJson(DATE_1, DATE_END);
String instanceId = "instance-id";
JobInstance jobInstance = JobInstance.fromInstanceId(instanceId);
String chunkId = "chunk-id";
StepExecutionDetails<PartitionedUrlListJobParameters, PartitionedUrlChunkRangeJson> details = new StepExecutionDetails<>(parameters, range, jobInstance, new WorkChunk().setId(chunkId));
StepExecutionDetails<JobParameters, ChunkRangeJson> details = new StepExecutionDetails<>(parameters, range, jobInstance, new WorkChunk().setId(chunkId));
// First Execution
when(myBatch2DaoSvc.fetchResourceIdStream(eq(DATE_1), eq(DATE_END), isNull(), isNull()))
.thenReturn(createIdChunk(0L, 20000L, DATE_2));
when(myBatch2DaoSvc.fetchResourceIdStream(eq(DATE_1), eq(DATE_END), isNull(), isNull())).thenReturn(createIdChunk());
mySvc.run(details, mySink);
@ -98,13 +96,12 @@ public class LoadIdsStepTest {
}
@Nonnull
private IResourcePidStream createIdChunk(long idLow, long idHigh, Date lastDate) {
private IResourcePidStream createIdChunk() {
List<IResourcePersistentId> ids = new ArrayList<>();
List<String> resourceTypes = new ArrayList<>();
for (long i = idLow; i < idHigh; i++) {
for (long i = 0; i < 20000; i++) {
ids.add(JpaPid.fromId(i));
}
IResourcePidList chunk = new HomogeneousResourcePidList("Patient", ids, lastDate, null);
IResourcePidList chunk = new HomogeneousResourcePidList("Patient", ids, DATE_2, null);
return new ListWrappingPidStream(chunk);
}

View File

@ -3,9 +3,10 @@ package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.pid.ListWrappingPidStream;
@ -35,20 +36,20 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class ResourceIdListStepTest {
@Mock
private IIdChunkProducer<PartitionedUrlChunkRangeJson> myIdChunkProducer;
private IIdChunkProducer<ChunkRangeJson> myIdChunkProducer;
@Mock
private StepExecutionDetails<PartitionedUrlListJobParameters, PartitionedUrlChunkRangeJson> myStepExecutionDetails;
private StepExecutionDetails<JobParameters, ChunkRangeJson> myStepExecutionDetails;
@Mock
private IJobDataSink<ResourceIdListWorkChunkJson> myDataSink;
@Mock
private PartitionedUrlChunkRangeJson myData;
private ChunkRangeJson myData;
@Mock
private PartitionedUrlListJobParameters myParameters;
private JobParameters myParameters;
@Captor
private ArgumentCaptor<ResourceIdListWorkChunkJson> myDataCaptor;
private ResourceIdListStep<PartitionedUrlListJobParameters, PartitionedUrlChunkRangeJson> myResourceIdListStep;
private ResourceIdListStep<JobParameters> myResourceIdListStep;
@BeforeEach
void beforeEach() {
@ -59,11 +60,13 @@ class ResourceIdListStepTest {
@ValueSource(ints = {0, 1, 100, 500, 501, 2345, 10500})
void testResourceIdListBatchSizeLimit(int theListSize) {
List<IResourcePersistentId> idList = generateIdList(theListSize);
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
when(myStepExecutionDetails.getData()).thenReturn(myData);
when(myParameters.getBatchSize()).thenReturn(500);
when(myStepExecutionDetails.getParameters()).thenReturn(myParameters);
IResourcePidStream mockStream = new ListWrappingPidStream(
new HomogeneousResourcePidList("Patient", idList, null, null));
IResourcePidStream resourcePidStream = new ListWrappingPidStream(
new HomogeneousResourcePidList("Patient", idList, null, partitionId));
if (theListSize > 0) {
// Ensure none of the work chunks exceed MAX_BATCH_OF_IDS in size:
doAnswer(i -> {
@ -73,8 +76,7 @@ class ResourceIdListStepTest {
return null;
}).when(myDataSink).accept(any(ResourceIdListWorkChunkJson.class));
}
when(myIdChunkProducer.fetchResourceIdStream(any(), any(), any(), any()))
.thenReturn(mockStream);
when(myIdChunkProducer.fetchResourceIdStream(any())).thenReturn(resourcePidStream);
final RunOutcome run = myResourceIdListStep.run(myStepExecutionDetails, myDataSink);
assertThat(run).isNotEqualTo(null);
@ -87,7 +89,9 @@ class ResourceIdListStepTest {
// Ensure that all chunks except the very last one are MAX_BATCH_OF_IDS in length
for (int i = 0; i < expectedBatchCount - 1; i++) {
assertEquals(ResourceIdListStep.MAX_BATCH_OF_IDS, allDataChunks.get(i).size());
ResourceIdListWorkChunkJson dataChunk = allDataChunks.get(i);
assertEquals(ResourceIdListStep.MAX_BATCH_OF_IDS, dataChunk.size());
assertEquals(partitionId, dataChunk.getRequestPartitionId());
}
// The very last chunk should be whatever is left over (if there is a remainder):

View File

@ -30,7 +30,7 @@
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-storage-batch2</artifactId>
<artifactId>hapi-fhir-storage-batch2-jobs</artifactId>
<version>${project.version}</version>
</dependency>

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.step.IIdChunkProducer;
import ca.uhn.fhir.batch2.jobs.step.ResourceIdListStep;
@ -32,11 +33,11 @@ import ca.uhn.fhir.mdm.batch2.clear.MdmClearJobParameters;
import jakarta.annotation.Nonnull;
public class LoadGoldenIdsStep
implements IJobStepWorker<MdmClearJobParameters, MdmChunkRangeJson, ResourceIdListWorkChunkJson> {
private final ResourceIdListStep<MdmClearJobParameters, MdmChunkRangeJson> myResourceIdListStep;
implements IJobStepWorker<MdmClearJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> {
private final ResourceIdListStep<MdmClearJobParameters> myResourceIdListStep;
public LoadGoldenIdsStep(IGoldenResourceSearchSvc theGoldenResourceSearchSvc) {
IIdChunkProducer<MdmChunkRangeJson> idChunkProducer = new MdmIdChunkProducer(theGoldenResourceSearchSvc);
IIdChunkProducer<ChunkRangeJson> idChunkProducer = new MdmIdChunkProducer(theGoldenResourceSearchSvc);
myResourceIdListStep = new ResourceIdListStep<>(idChunkProducer);
}
@ -44,7 +45,7 @@ public class LoadGoldenIdsStep
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<MdmClearJobParameters, MdmChunkRangeJson> theStepExecutionDetails,
@Nonnull StepExecutionDetails<MdmClearJobParameters, ChunkRangeJson> theStepExecutionDetails,
@Nonnull IJobDataSink<ResourceIdListWorkChunkJson> theDataSink)
throws JobExecutionFailedException {
return myResourceIdListStep.run(theStepExecutionDetails, theDataSink);

View File

@ -20,6 +20,7 @@
package ca.uhn.fhir.mdm.batch2;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.jobs.config.BatchCommonCtx;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearAppCtx;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearJobParameters;
@ -34,7 +35,7 @@ import static ca.uhn.fhir.mdm.batch2.clear.MdmClearAppCtx.MDM_CLEAR_JOB_BEAN_NAM
import static ca.uhn.fhir.mdm.batch2.submit.MdmSubmitAppCtx.MDM_SUBMIT_JOB_BEAN_NAME;
@Configuration
@Import({MdmClearAppCtx.class, MdmSubmitAppCtx.class})
@Import({MdmClearAppCtx.class, MdmSubmitAppCtx.class, BatchCommonCtx.class})
public class MdmBatch2Config {
@Bean
MdmJobDefinitionLoader mdmJobDefinitionLoader(

View File

@ -1,40 +0,0 @@
/*-
* #%L
* hapi-fhir-storage-mdm
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.mdm.batch2;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
public class MdmChunkRangeJson extends ChunkRangeJson {
@Nonnull
@JsonProperty("resourceType")
private String myResourceType;
@Nonnull
public String getResourceType() {
return myResourceType;
}
public void setResourceType(@Nullable String theResourceType) {
myResourceType = theResourceType;
}
}

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.util.Batch2Utils;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearJobParameters;
import jakarta.annotation.Nonnull;
@ -33,14 +34,14 @@ import org.slf4j.LoggerFactory;
import java.util.Date;
public class MdmGenerateRangeChunksStep implements IFirstJobStepWorker<MdmClearJobParameters, MdmChunkRangeJson> {
public class MdmGenerateRangeChunksStep implements IFirstJobStepWorker<MdmClearJobParameters, ChunkRangeJson> {
private static final Logger ourLog = LoggerFactory.getLogger(MdmGenerateRangeChunksStep.class);
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<MdmClearJobParameters, VoidModel> theStepExecutionDetails,
@Nonnull IJobDataSink<MdmChunkRangeJson> theDataSink)
@Nonnull IJobDataSink<ChunkRangeJson> theDataSink)
throws JobExecutionFailedException {
MdmClearJobParameters params = theStepExecutionDetails.getParameters();
@ -49,10 +50,10 @@ public class MdmGenerateRangeChunksStep implements IFirstJobStepWorker<MdmClearJ
for (String nextResourceType : params.getResourceNames()) {
ourLog.info("Initiating mdm clear of [{}]] Golden Resources from {} to {}", nextResourceType, start, end);
MdmChunkRangeJson nextRange = new MdmChunkRangeJson();
nextRange.setResourceType(nextResourceType);
nextRange.setStart(start);
nextRange.setEnd(end);
assert nextResourceType != null;
ChunkRangeJson nextRange = new ChunkRangeJson(start, end)
.setResourceType(nextResourceType)
.setPartitionId(params.getRequestPartitionId());
theDataSink.accept(nextRange);
}

View File

@ -19,17 +19,14 @@
*/
package ca.uhn.fhir.mdm.batch2;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.step.IIdChunkProducer;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.svc.IGoldenResourceSearchSvc;
import jakarta.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
public class MdmIdChunkProducer implements IIdChunkProducer<MdmChunkRangeJson> {
public class MdmIdChunkProducer implements IIdChunkProducer<ChunkRangeJson> {
private static final Logger ourLog = LoggerFactory.getLogger(MdmIdChunkProducer.class);
private final IGoldenResourceSearchSvc myGoldenResourceSearchSvc;
@ -38,17 +35,16 @@ public class MdmIdChunkProducer implements IIdChunkProducer<MdmChunkRangeJson> {
}
@Override
public IResourcePidStream fetchResourceIdStream(
Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, MdmChunkRangeJson theData) {
public IResourcePidStream fetchResourceIdStream(ChunkRangeJson theData) {
String resourceType = theData.getResourceType();
ourLog.info(
"Fetching golden resource ID chunk for resource type {} - Range {} - {}",
resourceType,
theStart,
theEnd);
theData.getStart(),
theData.getEnd());
return myGoldenResourceSearchSvc.fetchGoldenResourceIdStream(
theStart, theEnd, theRequestPartitionId, resourceType);
theData.getStart(), theData.getEnd(), theData.getPartitionId(), resourceType);
}
}

View File

@ -19,13 +19,13 @@
*/
package ca.uhn.fhir.mdm.batch2.clear;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.svc.IGoldenResourceSearchSvc;
import ca.uhn.fhir.mdm.api.IMdmSettings;
import ca.uhn.fhir.mdm.batch2.LoadGoldenIdsStep;
import ca.uhn.fhir.mdm.batch2.MdmChunkRangeJson;
import ca.uhn.fhir.mdm.batch2.MdmGenerateRangeChunksStep;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -50,7 +50,7 @@ public class MdmClearAppCtx {
.addFirstStep(
"generate-ranges",
"Generate date ranges to Mdm Clear",
MdmChunkRangeJson.class,
ChunkRangeJson.class,
mdmGenerateRangeChunksStep())
.addIntermediateStep(
"find-golden-resource-ids",

View File

@ -19,7 +19,7 @@
*/
package ca.uhn.fhir.mdm.batch2.clear;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import jakarta.validation.constraints.Pattern;
@ -28,7 +28,7 @@ import org.apache.commons.lang3.Validate;
import java.util.ArrayList;
import java.util.List;
public class MdmClearJobParameters extends PartitionedJobParameters {
public class MdmClearJobParameters extends JobParameters {
@JsonProperty("resourceType")
@Nonnull
private List<@Pattern(regexp = "^[A-Z][A-Za-z]+$", message = "If populated, must be a valid resource type'") String>

View File

@ -75,8 +75,7 @@ public class MdmClearStep implements IJobStepWorker<MdmClearJobParameters, Resou
SystemRequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRetry(true);
requestDetails.setMaxRetries(100);
requestDetails.setRequestPartitionId(
theStepExecutionDetails.getParameters().getRequestPartitionId());
requestDetails.setRequestPartitionId(theStepExecutionDetails.getData().getRequestPartitionId());
TransactionDetails transactionDetails = new TransactionDetails();
myHapiTransactionService.execute(
requestDetails,

View File

@ -19,7 +19,9 @@
*/
package ca.uhn.fhir.mdm.batch2.submit;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
@ -37,13 +39,8 @@ public class MdmSubmitAppCtx {
public static final String MDM_SUBMIT_JOB_BEAN_NAME = "mdmSubmitJobDefinition";
public static String MDM_SUBMIT_JOB = "MDM_SUBMIT";
@Bean
public GenerateRangeChunksStep submitGenerateRangeChunksStep() {
return new GenerateRangeChunksStep();
}
@Bean(name = MDM_SUBMIT_JOB_BEAN_NAME)
public JobDefinition mdmSubmitJobDefinition(
public JobDefinition<MdmSubmitJobParameters> mdmSubmitJobDefinition(
IBatch2DaoSvc theBatch2DaoSvc,
MatchUrlService theMatchUrlService,
FhirContext theFhirContext,
@ -58,10 +55,10 @@ public class MdmSubmitAppCtx {
.addFirstStep(
"generate-ranges",
"generate data ranges to submit to mdm",
PartitionedUrlChunkRangeJson.class,
ChunkRangeJson.class,
submitGenerateRangeChunksStep())
.addIntermediateStep(
"load-ids", "Load the IDs", ResourceIdListWorkChunkJson.class, new LoadIdsStep(theBatch2DaoSvc))
"load-ids", "Load the IDs", ResourceIdListWorkChunkJson.class, loadIdsStep(theBatch2DaoSvc))
.addLastStep(
"inflate-and-submit-resources",
"Inflate and Submit resources",
@ -76,7 +73,18 @@ public class MdmSubmitAppCtx {
}
@Bean
public MdmInflateAndSubmitResourcesStep mdmInflateAndSubmitResourcesStep() {
public GenerateRangeChunksStep<MdmSubmitJobParameters> submitGenerateRangeChunksStep() {
return new GenerateRangeChunksStep<>();
}
@Bean
public LoadIdsStep<MdmSubmitJobParameters> loadIdsStep(IBatch2DaoSvc theBatch2DaoSvc) {
return new LoadIdsStep<>(theBatch2DaoSvc);
}
@Bean
public IJobStepWorker<MdmSubmitJobParameters, ResourceIdListWorkChunkJson, VoidModel>
mdmInflateAndSubmitResourcesStep() {
return new MdmInflateAndSubmitResourcesStep();
}
}

View File

@ -19,6 +19,6 @@
*/
package ca.uhn.fhir.mdm.batch2.submit;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
public class MdmSubmitJobParameters extends PartitionedUrlListJobParameters {}
public class MdmSubmitJobParameters extends JobParameters {}

View File

@ -54,25 +54,25 @@ public class MdmSubmitJobParametersValidator implements IJobParametersValidator<
String resourceType = getResourceTypeFromUrl(url);
RuntimeResourceDefinition resourceDefinition = myFhirContext.getResourceDefinition(resourceType);
validateTypeIsUsedByMdm(errorMsgs, resourceType);
validateAllSearchParametersApplyToResourceType(errorMsgs, partitionedUrl, resourceType, resourceDefinition);
validateAllSearchParametersApplyToResourceType(errorMsgs, url, resourceType, resourceDefinition);
}
return errorMsgs;
}
private void validateAllSearchParametersApplyToResourceType(
List<String> errorMsgs,
PartitionedUrl partitionedUrl,
String resourceType,
List<String> theErrorMessages,
String theUrl,
String theResourceType,
RuntimeResourceDefinition resourceDefinition) {
try {
myMatchUrlService.translateMatchUrl(partitionedUrl.getUrl(), resourceDefinition);
myMatchUrlService.translateMatchUrl(theUrl, resourceDefinition);
} catch (MatchUrlService.UnrecognizedSearchParameterException e) {
String errorMsg = String.format(
"Search parameter %s is not recognized for resource type %s. Source error is %s",
e.getParamName(), resourceType, e.getMessage());
errorMsgs.add(errorMsg);
e.getParamName(), theResourceType, e.getMessage());
theErrorMessages.add(errorMsg);
} catch (InvalidRequestException e) {
errorMsgs.add("Invalid request detected: " + e.getMessage());
theErrorMessages.add("Invalid request detected: " + e.getMessage());
}
}

View File

@ -38,8 +38,11 @@ import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -50,6 +53,7 @@ import static ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster.doCal
import static ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster.hasHooks;
public abstract class BaseRequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
private static final Logger ourLog = LoggerFactory.getLogger(BaseRequestPartitionHelperSvc.class);
private final HashSet<Object> myNonPartitionableResourceNames;
@ -95,81 +99,83 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition
@Override
public RequestPartitionId determineReadPartitionForRequest(
@Nullable RequestDetails theRequest, @Nonnull ReadPartitionIdRequestDetails theDetails) {
RequestPartitionId requestPartitionId;
String resourceType = theDetails.getResourceType();
boolean nonPartitionableResource = !isResourcePartitionable(resourceType);
if (myPartitionSettings.isPartitioningEnabled()) {
RequestDetails requestDetails = theRequest;
// TODO GGG eventually, theRequest will not be allowed to be null here, and we will pass through
// SystemRequestDetails instead.
if (requestDetails == null) {
requestDetails = new SystemRequestDetails();
}
// Handle system requests
if (requestDetails instanceof SystemRequestDetails
&& systemRequestHasExplicitPartition((SystemRequestDetails) requestDetails)
&& !nonPartitionableResource) {
requestPartitionId =
getSystemRequestPartitionId((SystemRequestDetails) requestDetails, nonPartitionableResource);
} else if ((requestDetails instanceof SystemRequestDetails) && nonPartitionableResource) {
return RequestPartitionId.fromPartitionId(myPartitionSettings.getDefaultPartitionId());
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, requestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY
HookParams params = new HookParams()
.add(RequestDetails.class, requestDetails)
.addIfMatchesType(ServletRequestDetails.class, requestDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params);
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_READ, myInterceptorBroadcaster, requestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_READ
HookParams params = new HookParams()
.add(RequestDetails.class, requestDetails)
.addIfMatchesType(ServletRequestDetails.class, requestDetails)
.add(ReadPartitionIdRequestDetails.class, theDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_READ, params);
} else {
requestPartitionId = null;
}
validateRequestPartitionNotNull(requestPartitionId, Pointcut.STORAGE_PARTITION_IDENTIFY_READ);
return validateNormalizeAndNotifyHooksForRead(requestPartitionId, requestDetails, resourceType);
if (!myPartitionSettings.isPartitioningEnabled()) {
return RequestPartitionId.allPartitions();
}
return RequestPartitionId.allPartitions();
// certain use-cases (e.g. batch2 jobs), only have resource type populated in the ReadPartitionIdRequestDetails
// TODO MM: see if we can make RequestDetails consistent
String resourceType = theDetails.getResourceType();
RequestDetails requestDetails = theRequest;
// TODO GGG eventually, theRequest will not be allowed to be null here, and we will pass through
// SystemRequestDetails instead.
if (requestDetails == null) {
requestDetails = new SystemRequestDetails();
}
boolean nonPartitionableResource = isResourceNonPartitionable(resourceType);
RequestPartitionId requestPartitionId = null;
// Handle system requests
if (requestDetails instanceof SystemRequestDetails
&& systemRequestHasExplicitPartition((SystemRequestDetails) requestDetails)
&& !nonPartitionableResource) {
requestPartitionId = getSystemRequestPartitionId((SystemRequestDetails) requestDetails, false);
} else if ((requestDetails instanceof SystemRequestDetails) && nonPartitionableResource) {
requestPartitionId = RequestPartitionId.fromPartitionId(myPartitionSettings.getDefaultPartitionId());
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, requestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY
HookParams params = new HookParams()
.add(RequestDetails.class, requestDetails)
.addIfMatchesType(ServletRequestDetails.class, requestDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params);
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_READ, myInterceptorBroadcaster, requestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_READ
HookParams params = new HookParams()
.add(RequestDetails.class, requestDetails)
.addIfMatchesType(ServletRequestDetails.class, requestDetails)
.add(ReadPartitionIdRequestDetails.class, theDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_READ, params);
}
validateRequestPartitionNotNull(
requestPartitionId, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, Pointcut.STORAGE_PARTITION_IDENTIFY_READ);
return validateAndNormalizePartition(requestPartitionId, requestDetails, resourceType);
}
@Override
public RequestPartitionId determineGenericPartitionForRequest(RequestDetails theRequestDetails) {
RequestPartitionId retVal = null;
RequestPartitionId requestPartitionId = null;
if (myPartitionSettings.isPartitioningEnabled()) {
if (theRequestDetails instanceof SystemRequestDetails) {
SystemRequestDetails systemRequestDetails = (SystemRequestDetails) theRequestDetails;
retVal = systemRequestDetails.getRequestPartitionId();
}
if (!myPartitionSettings.isPartitioningEnabled()) {
return RequestPartitionId.allPartitions();
}
if (retVal == null) {
if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, theRequestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY
HookParams params = new HookParams()
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
retVal = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params);
if (retVal != null) {
retVal = validateNormalizeAndNotifyHooksForRead(retVal, theRequestDetails, null);
}
}
if (theRequestDetails instanceof SystemRequestDetails
&& systemRequestHasExplicitPartition((SystemRequestDetails) theRequestDetails)) {
requestPartitionId = getSystemRequestPartitionId((SystemRequestDetails) theRequestDetails);
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, theRequestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY
HookParams params = new HookParams()
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params);
}
return retVal;
// TODO MM: at the moment it is ok for this method to return null
// check if it can be made consistent and it's implications
// validateRequestPartitionNotNull(requestPartitionId, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY);
if (requestPartitionId != null) {
return validateAndNormalizePartition(
requestPartitionId, theRequestDetails, theRequestDetails.getResourceName());
}
return null;
}
/**
@ -216,56 +222,58 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition
@Nonnull
@Override
public RequestPartitionId determineCreatePartitionForRequest(
@Nullable RequestDetails theRequest, @Nonnull IBaseResource theResource, @Nonnull String theResourceType) {
RequestPartitionId requestPartitionId;
@Nullable final RequestDetails theRequest,
@Nonnull IBaseResource theResource,
@Nonnull String theResourceType) {
if (!myPartitionSettings.isPartitioningEnabled()) {
return RequestPartitionId.allPartitions();
}
boolean nonPartitionableResource = myNonPartitionableResourceNames.contains(theResourceType);
RequestDetails requestDetails = theRequest;
// TODO GGG eventually, theRequest will not be allowed to be null here, and we will pass through
// SystemRequestDetails instead.
if ((theRequest == null || theRequest instanceof SystemRequestDetails) && nonPartitionableResource) {
return RequestPartitionId.defaultPartition();
if (theRequest == null) {
requestDetails = new SystemRequestDetails();
}
boolean nonPartitionableResource = isResourceNonPartitionable(theResourceType);
RequestPartitionId requestPartitionId = null;
if (theRequest instanceof SystemRequestDetails
&& systemRequestHasExplicitPartition((SystemRequestDetails) theRequest)) {
requestPartitionId =
getSystemRequestPartitionId((SystemRequestDetails) theRequest, nonPartitionableResource);
} else {
if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, theRequest)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY
HookParams params = new HookParams()
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params);
} else {
// This is an external Request (e.g. ServletRequestDetails) so we want to figure out the partition
// via interceptor.
// Interceptor call: STORAGE_PARTITION_IDENTIFY_CREATE
HookParams params = new HookParams()
.add(IBaseResource.class, theResource)
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, params);
}
// If the interceptors haven't selected a partition, and its a non-partitionable resource anyhow, send
// to DEFAULT
if (nonPartitionableResource && requestPartitionId == null) {
requestPartitionId = RequestPartitionId.defaultPartition();
}
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, requestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY
HookParams params = new HookParams()
.add(RequestDetails.class, requestDetails)
.addIfMatchesType(ServletRequestDetails.class, requestDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params);
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, myInterceptorBroadcaster, requestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_CREATE
HookParams params = new HookParams()
.add(IBaseResource.class, theResource)
.add(RequestDetails.class, requestDetails)
.addIfMatchesType(ServletRequestDetails.class, requestDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, params);
}
String resourceName = myFhirContext.getResourceType(theResource);
validateSinglePartitionForCreate(requestPartitionId, resourceName, Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE);
// If the interceptors haven't selected a partition, and its a non-partitionable resource anyhow, send
// to DEFAULT
if (nonPartitionableResource && requestPartitionId == null) {
requestPartitionId = RequestPartitionId.defaultPartition();
}
return validateNormalizeAndNotifyHooksForRead(requestPartitionId, theRequest, theResourceType);
validateRequestPartitionNotNull(
requestPartitionId,
Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE,
Pointcut.STORAGE_PARTITION_IDENTIFY_ANY);
validatePartitionForCreate(requestPartitionId, theResourceType);
return validateAndNormalizePartition(requestPartitionId, requestDetails, theResourceType);
}
private boolean systemRequestHasExplicitPartition(@Nonnull SystemRequestDetails theRequest) {
@ -288,7 +296,7 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition
* If the partition has both, they are validated to ensure that they correspond.
*/
@Nonnull
private RequestPartitionId validateNormalizeAndNotifyHooksForRead(
private RequestPartitionId validateAndNormalizePartition(
@Nonnull RequestPartitionId theRequestPartitionId,
RequestDetails theRequest,
@Nullable String theResourceType) {
@ -330,25 +338,32 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition
@Override
public boolean isResourcePartitionable(String theResourceType) {
return !myNonPartitionableResourceNames.contains(theResourceType);
return theResourceType != null && !myNonPartitionableResourceNames.contains(theResourceType);
}
private void validateSinglePartitionForCreate(
RequestPartitionId theRequestPartitionId, @Nonnull String theResourceName, Pointcut thePointcut) {
validateRequestPartitionNotNull(theRequestPartitionId, thePointcut);
private boolean isResourceNonPartitionable(String theResourceType) {
return theResourceType != null && !isResourcePartitionable(theResourceType);
}
private void validatePartitionForCreate(RequestPartitionId theRequestPartitionId, String theResourceName) {
if (theRequestPartitionId.hasPartitionIds()) {
validateSinglePartitionIdOrNameForCreate(theRequestPartitionId.getPartitionIds());
validateSinglePartitionIdOrName(theRequestPartitionId.getPartitionIds());
}
validateSinglePartitionIdOrNameForCreate(theRequestPartitionId.getPartitionNames());
validateSinglePartitionIdOrName(theRequestPartitionId.getPartitionNames());
// Make sure we're not using one of the conformance resources in a non-default partition
if (theRequestPartitionId.isDefaultPartition() || theRequestPartitionId.isAllPartitions()) {
return;
}
// TODO MM: check if we need to validate using the configured value PartitionSettings.defaultPartition
// however that is only used for read and not for create at the moment
if ((theRequestPartitionId.hasPartitionIds()
&& !theRequestPartitionId.getPartitionIds().contains(null))
|| (theRequestPartitionId.hasPartitionNames()
&& !theRequestPartitionId.getPartitionNames().contains(JpaConstants.DEFAULT_PARTITION_NAME))) {
if (!isResourcePartitionable(theResourceName)) {
if (isResourceNonPartitionable(theResourceName)) {
String msg = myFhirContext
.getLocalizer()
.getMessageSanitized(
@ -360,14 +375,15 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition
}
}
private void validateRequestPartitionNotNull(RequestPartitionId theRequestPartitionId, Pointcut theThePointcut) {
private static void validateRequestPartitionNotNull(
RequestPartitionId theRequestPartitionId, Pointcut... thePointcuts) {
if (theRequestPartitionId == null) {
throw new InternalErrorException(
Msg.code(1319) + "No interceptor provided a value for pointcut: " + theThePointcut);
Msg.code(1319) + "No interceptor provided a value for pointcuts: " + Arrays.toString(thePointcuts));
}
}
private void validateSinglePartitionIdOrNameForCreate(@Nullable List<?> thePartitionIds) {
private static void validateSinglePartitionIdOrName(@Nullable List<?> thePartitionIds) {
if (thePartitionIds != null && thePartitionIds.size() != 1) {
throw new InternalErrorException(
Msg.code(1320) + "RequestPartitionId must contain a single partition for create operations, found: "