Cleanup and enhance batch2 jobs to support running them across multiple partitions.

This commit is contained in:
Martha 2024-06-27 15:56:09 -07:00
parent 8a4e5ca673
commit ec8aad3e78
54 changed files with 992 additions and 534 deletions

View File

@ -204,8 +204,6 @@ ca.uhn.fhir.jpa.partition.PartitionLookupSvcImpl.invalidName=Partition name "{0}
ca.uhn.fhir.jpa.partition.PartitionLookupSvcImpl.cantCreateDuplicatePartitionName=Partition name "{0}" is already defined
ca.uhn.fhir.jpa.partition.PartitionLookupSvcImpl.cantCreateDefaultPartition=Can not create partition with name "DEFAULT"
ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInterceptor.unknownTenantName=Unknown tenant: {0}
ca.uhn.fhir.jpa.dao.HistoryBuilder.noSystemOrTypeHistoryForPartitionAwareServer=Type- and Server- level history operation not supported across partitions on partitioned server
ca.uhn.fhir.jpa.provider.DiffProvider.cantDiffDifferentTypes=Unable to diff two resources of different types

View File

@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
@ -277,7 +278,8 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest {
}
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ)
public RequestPartitionId partitionIdentifyRead(ServletRequestDetails theRequestDetails) {
public RequestPartitionId partitionIdentifyRead(ServletRequestDetails theRequestDetails,
ReadPartitionIdRequestDetails theDetails) {
// Just to be nice, figure out the first line in the stack that isn't a part of the
// partitioning or interceptor infrastructure, just so it's obvious who is asking

View File

@ -2883,6 +2883,7 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test {
createPatient(withPartition(1), withActiveTrue());
myCaptureQueriesListener.clear();
addReadDefaultPartition();
Bundle outcome = mySystemDao.transaction(mySrd, input.get());
ourLog.debug("Resp: {}", myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(outcome));
myCaptureQueriesListener.logSelectQueriesForCurrentThread();

View File

@ -15,6 +15,8 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
@ -40,9 +42,10 @@ public class ReindexStepTest {
public void testMethodReindex_withRequestPartitionId_willExecuteWithPartitionId(){
// given
Integer expectedPartitionId = 1;
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson();
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(expectedPartitionId);
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(List.of(), partitionId);
ReindexJobParameters reindexJobParameters = new ReindexJobParameters();
reindexJobParameters.setRequestPartitionId(RequestPartitionId.fromPartitionId(expectedPartitionId));
reindexJobParameters.setRequestPartitionId(partitionId);
when(myHapiTransactionService.withRequest(any())).thenCallRealMethod();
when(myHapiTransactionService.buildExecutionBuilder(any())).thenCallRealMethod();

View File

@ -20,8 +20,8 @@ import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class DeleteExpungeJobTest extends BaseJpaR4Test {
@Autowired
@ -62,7 +62,8 @@ public class DeleteExpungeJobTest extends BaseJpaR4Test {
assertEquals(2, myDiagnosticReportDao.search(SearchParameterMap.newSynchronous()).size());
DeleteExpungeJobParameters jobParameters = new DeleteExpungeJobParameters();
jobParameters.addUrl("Observation?subject.active=false").addUrl("DiagnosticReport?subject.active=false");
jobParameters.addUrl("Observation?subject.active=false");
jobParameters.addUrl("DiagnosticReport?subject.active=false");
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setParameters(jobParameters);

View File

@ -311,6 +311,29 @@ public class ReindexJobTest extends BaseJpaR4Test {
myStorageSettings.setMarkResourcesForReindexingUponSearchParameterChange(reindexPropertyCache);
}
@Test
public void testReindex_byMultipleUrls_indexesMatchingResources() {
// setup
createObservation(withStatus(Observation.ObservationStatus.FINAL.toCode()));
createObservation(withStatus(Observation.ObservationStatus.CANCELLED.toCode()));
createPatient(withActiveTrue());
createPatient(withActiveFalse());
// Only reindex one of them
ReindexJobParameters parameters = new ReindexJobParameters();
parameters.addUrl("Observation?status=final");
parameters.addUrl("Patient?");
// execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(startRequest);
JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res);
assertThat(jobInstance.getCombinedRecordsProcessed()).isEqualTo(3);
}
@Test
public void testReindexDeletedResources_byUrl_willRemoveDeletedResourceEntriesFromIndexTables(){
IIdType obsId = myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.FINAL);

View File

@ -0,0 +1,100 @@
package ca.uhn.fhir.jpa.delete.job;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInterceptor;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ReindexJobWithPartitioningTest extends BaseJpaR4Test {
@Autowired
private IJobCoordinator myJobCoordinator;
private final RequestTenantPartitionInterceptor myPartitionInterceptor = new RequestTenantPartitionInterceptor();
@BeforeEach
public void before() {
myInterceptorRegistry.registerInterceptor(myPartitionInterceptor);
myPartitionSettings.setPartitioningEnabled(true);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(1).setName("TestPartition1"), null);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(2).setName("TestPartition2"), null);
RequestPartitionId partition1 = RequestPartitionId.fromPartitionId(1);
RequestPartitionId partition2 = RequestPartitionId.fromPartitionId(2);
Observation observation1 = buildResource("Observation", withStatus(Observation.ObservationStatus.FINAL.toCode()));
myObservationDao.create(observation1, new SystemRequestDetails().setRequestPartitionId(partition1));
Observation observation2 = buildResource("Observation", withStatus(Observation.ObservationStatus.REGISTERED.toCode()));
myObservationDao.create(observation2, new SystemRequestDetails().setRequestPartitionId(partition1));
Observation observation3 = buildResource("Observation", withStatus(Observation.ObservationStatus.FINAL.toCode()));
myObservationDao.create(observation3, new SystemRequestDetails().setRequestPartitionId(partition2));
Patient patient1 = buildResource("Patient", withActiveTrue());
myPatientDao.create(patient1, new SystemRequestDetails().setRequestPartitionId(partition1));
Patient patient2 = buildResource("Patient", withActiveFalse());
myPatientDao.create(patient2, new SystemRequestDetails().setRequestPartitionId(partition2));
}
@AfterEach
public void after() {
myInterceptorRegistry.unregisterInterceptor(myPartitionInterceptor);
myPartitionSettings.setPartitioningEnabled(new PartitionSettings().isPartitioningEnabled());
}
public static Stream<Arguments> getReindexParameters() {
List<RequestPartitionId> allPartitions = List.of(RequestPartitionId.fromPartitionId(1), RequestPartitionId.fromPartitionId(2));
List<RequestPartitionId> partition1 = List.of(RequestPartitionId.fromPartitionId(1));
return Stream.of(
Arguments.of(List.of(), List.of(), false, 5),
Arguments.of(List.of("Observation?"), allPartitions, false, 3),
Arguments.of(List.of("Observation?"), List.of(), false, 0),
Arguments.of(List.of("Observation?"), partition1, true, 2),
Arguments.of(List.of("Observation?", "Patient?"), allPartitions, false, 5),
Arguments.of(List.of("Observation?", "Patient?"), allPartitions, true, 3),
Arguments.of(List.of("Observation?status=final", "Patient?"), allPartitions, false, 4),
Arguments.of(List.of("Observation?status=final", "Patient?"), allPartitions, true, 2),
Arguments.of(List.of("Observation?status=final", "Patient?"), partition1, false, 2)
);
}
@ParameterizedTest
@MethodSource(value = "getReindexParameters")
public void testReindex_byMultipleUrlsAndPartitions_indexesMatchingResources(List<String> theUrls,
List<RequestPartitionId> thePartitions,
boolean theShouldAssignPartitionToUrl,
int theExpectedIndexedResourceCount) {
JobParameters parameters = JobParameters.from(theUrls, thePartitions, theShouldAssignPartitionToUrl);
// execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res);
// verify only resources matching URLs and partitions provided via parameters were re-indexed
assertThat(jobInstance.getCombinedRecordsProcessed()).isEqualTo(theExpectedIndexedResourceCount);
}
}

View File

@ -200,7 +200,7 @@ public class PartitioningInterceptorR4Test extends BaseJpaR4SystemTest {
myPatientDao.search(map);
fail();
} catch (InternalErrorException e) {
assertEquals(Msg.code(1319) + "No interceptor provided a value for pointcut: STORAGE_PARTITION_IDENTIFY_READ", e.getMessage());
assertEquals(Msg.code(1319) + "No interceptor provided a value for pointcuts: [STORAGE_PARTITION_IDENTIFY_ANY, STORAGE_PARTITION_IDENTIFY_READ]", e.getMessage());
}
}

View File

@ -27,6 +27,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import ca.uhn.fhir.rest.server.tenant.ITenantIdentificationStrategy;
import jakarta.annotation.Nonnull;
@ -57,6 +58,7 @@ public class RequestTenantPartitionInterceptor {
// We will use the tenant ID that came from the request as the partition name
String tenantId = theRequestDetails.getTenantId();
if (isBlank(tenantId)) {
// this branch is no-op happen when "partitioning.tenant_identification_strategy" is set to URL_BASED
if (theRequestDetails instanceof SystemRequestDetails) {
SystemRequestDetails requestDetails = (SystemRequestDetails) theRequestDetails;
if (requestDetails.getRequestPartitionId() != null) {
@ -67,6 +69,9 @@ public class RequestTenantPartitionInterceptor {
throw new InternalErrorException(Msg.code(343) + "No partition ID has been specified");
}
if (tenantId.equals(ProviderConstants.ALL_PARTITIONS_NAME)) {
return RequestPartitionId.allPartitions();
}
return RequestPartitionId.fromPartitionName(tenantId);
}
}

View File

@ -53,6 +53,7 @@ public class ProviderConstants {
public static final String PARTITION_MANAGEMENT_PARTITION_DESC = "description";
public static final String DEFAULT_PARTITION_NAME = "DEFAULT";
public static final String ALL_PARTITIONS_NAME = "_ALL";
/**
* Operation name: diff

View File

@ -19,7 +19,7 @@
*/
package ca.uhn.fhir.batch2.jobs.expunge;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
@ -59,7 +59,7 @@ public class DeleteExpungeAppCtx {
.addFirstStep(
"generate-ranges",
"Generate data ranges to expunge",
PartitionedUrlChunkRangeJson.class,
ChunkRangeJson.class,
expungeGenerateRangeChunksStep())
.addIntermediateStep(
"load-ids",

View File

@ -19,10 +19,10 @@
*/
package ca.uhn.fhir.batch2.jobs.expunge;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DeleteExpungeJobParameters extends PartitionedUrlListJobParameters {
public class DeleteExpungeJobParameters extends JobParameters {
@JsonProperty("cascade")
private boolean myCascade;

View File

@ -26,10 +26,10 @@ import ca.uhn.fhir.jpa.api.svc.IDeleteExpungeSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.util.ValidateUtil;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
public class DeleteExpungeJobParametersValidator implements IJobParametersValidator<DeleteExpungeJobParameters> {
private final IUrlListValidator myUrlListValidator;

View File

@ -159,10 +159,9 @@ public class BulkDataImportProvider {
RequestPartitionId partitionId =
myRequestPartitionHelperService.determineReadPartitionForRequestForServerOperation(
theRequestDetails, JpaConstants.OPERATION_IMPORT);
if (!partitionId.isAllPartitions()) {
myRequestPartitionHelperService.validateHasPartitionPermissions(theRequestDetails, "Binary", partitionId);
jobParameters.setPartitionId(partitionId);
}
// TODO MM: I believe this is already checked as part of
myRequestPartitionHelperService.validateHasPartitionPermissions(theRequestDetails, "Binary", partitionId);
jobParameters.setPartitionId(partitionId);
// Extract all the URLs and order them in the order that is least
// likely to result in conflict (e.g. Patients before Observations

View File

@ -20,9 +20,10 @@
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
@ -31,7 +32,6 @@ import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -53,7 +53,7 @@ public class ReindexAppCtx {
.addFirstStep(
"generate-ranges",
"Generate data ranges to reindex",
PartitionedUrlChunkRangeJson.class,
ChunkRangeJson.class,
reindexGenerateRangeChunksStep())
.addIntermediateStep(
"load-ids",
@ -65,13 +65,12 @@ public class ReindexAppCtx {
}
@Bean
public IJobStepWorker<ReindexJobParameters, VoidModel, PartitionedUrlChunkRangeJson>
reindexGenerateRangeChunksStep() {
public IJobStepWorker<ReindexJobParameters, VoidModel, ChunkRangeJson> reindexGenerateRangeChunksStep() {
return new GenerateRangeChunksStep<>();
}
@Bean
public IJobStepWorker<ReindexJobParameters, PartitionedUrlChunkRangeJson, ResourceIdListWorkChunkJson> loadIdsStep(
public IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> loadIdsStep(
IBatch2DaoSvc theBatch2DaoSvc) {
return new LoadIdsStep<>(theBatch2DaoSvc);
}
@ -91,8 +90,8 @@ public class ReindexAppCtx {
public ReindexProvider reindexProvider(
FhirContext theFhirContext,
IJobCoordinator theJobCoordinator,
IRequestPartitionHelperSvc theRequestPartitionHelperSvc,
IJobPartitionProvider theJobPartitionHandler,
UrlPartitioner theUrlPartitioner) {
return new ReindexProvider(theFhirContext, theJobCoordinator, theRequestPartitionHelperSvc, theUrlPartitioner);
return new ReindexProvider(theFhirContext, theJobCoordinator, theJobPartitionHandler, theUrlPartitioner);
}
}

View File

@ -19,14 +19,14 @@
*/
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class ReindexJobParameters extends PartitionedUrlListJobParameters {
public class ReindexJobParameters extends JobParameters {
public static final String OPTIMIZE_STORAGE = "optimizeStorage";
public static final String REINDEX_SEARCH_PARAMETERS = "reindexSearchParameters";

View File

@ -20,13 +20,12 @@
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.model.api.annotation.Description;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
@ -50,7 +49,7 @@ public class ReindexProvider {
private final FhirContext myFhirContext;
private final IJobCoordinator myJobCoordinator;
private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
private final IJobPartitionProvider myJobPartitionProvider;
private final UrlPartitioner myUrlPartitioner;
/**
@ -59,11 +58,11 @@ public class ReindexProvider {
public ReindexProvider(
FhirContext theFhirContext,
IJobCoordinator theJobCoordinator,
IRequestPartitionHelperSvc theRequestPartitionHelperSvc,
IJobPartitionProvider theJobPartitionProvider,
UrlPartitioner theUrlPartitioner) {
myFhirContext = theFhirContext;
myJobCoordinator = theJobCoordinator;
myRequestPartitionHelperSvc = theRequestPartitionHelperSvc;
myJobPartitionProvider = theJobPartitionProvider;
myUrlPartitioner = theUrlPartitioner;
}
@ -128,10 +127,9 @@ public class ReindexProvider {
.forEach(params::addPartitionedUrl);
}
RequestPartitionId requestPartition =
myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(
theRequestDetails, ProviderConstants.OPERATION_REINDEX);
params.setRequestPartitionId(requestPartition);
myJobPartitionProvider
.getPartitions(theRequestDetails, ProviderConstants.OPERATION_REINDEX)
.forEach(params::addRequestPartitionId);
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);

View File

@ -103,7 +103,7 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Resourc
myHapiTransactionService
.withRequest(requestDetails)
.withTransactionDetails(transactionDetails)
.withRequestPartitionId(theJobParameters.getRequestPartitionId())
.withRequestPartitionId(data.getRequestPartitionId())
.execute(reindexJob);
return new RunOutcome(data.size());

View File

@ -37,8 +37,6 @@ public class ReindexJobParametersValidatorTest {
parameters.addUrl(theUrl);
// test
List<String> errors = myValidator.validate(null, parameters);
return errors;
return myValidator.validate(null, parameters);
}
}

View File

@ -1,9 +1,7 @@
package ca.uhn.fhir.batch2.jobs.reindex;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
@ -33,9 +31,15 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNotNull;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -59,6 +63,8 @@ public class ReindexProviderTest {
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Mock
private UrlPartitioner myUrlPartitioner;
@Mock
private IJobPartitionProvider myJobPartitionProvider;
@Captor
private ArgumentCaptor<JobInstanceStartRequest> myStartRequestCaptor;
@ -72,7 +78,7 @@ public class ReindexProviderTest {
when(myJobCoordinator.startInstance(isNotNull(), any()))
.thenReturn(createJobStartResponse());
when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(any(), any())).thenReturn(RequestPartitionId.allPartitions());
when(myJobPartitionProvider.getPartitions(any(), any())).thenReturn(List.of(RequestPartitionId.allPartitions()));
}
private Batch2JobStartResponse createJobStartResponse() {
@ -96,11 +102,7 @@ public class ReindexProviderTest {
input.addParameter(ProviderConstants.OPERATION_REINDEX_PARAM_BATCH_SIZE, new DecimalType(batchSize));
ourLog.debug(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
PartitionedUrl partitionedUrl = new PartitionedUrl();
partitionedUrl.setUrl(url);
partitionedUrl.setRequestPartitionId(RequestPartitionId.defaultPartition());
when(myUrlPartitioner.partitionUrl(anyString(), any())).thenReturn(partitionedUrl);
when(myUrlPartitioner.partitionUrl(anyString(), any())).thenReturn(new PartitionedUrl().setUrl(url).setRequestPartitionId(RequestPartitionId.defaultPartition()));
// Execute

View File

@ -0,0 +1,24 @@
package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import java.util.List;
/**
* Provides the list of partitions that a job should run against.
* TODO MM: Consider moving UrlPartitioner calls to this class once other operations need to be MegaScale enabled.
* That way all partitioning related logic exists only here for batch jobs.
* After that PartitionedUrl#myRequestPartitionId can be marked as deprecated.
*/
public interface IJobPartitionProvider {
/**
* Provides the list of partitions to run job steps against, based on the request that initiates the job.
* @param theRequestDetails the requestDetails
* @param theOperation the operation being run which corresponds to the job
* @return the list of partitions
*/
List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation);
// List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation, String theUrls);
}

View File

@ -21,11 +21,13 @@ package ca.uhn.fhir.batch2.config;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.JobPartitionProvider;
import ca.uhn.fhir.batch2.coordinator.ReductionStepExecutorServiceImpl;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
@ -33,6 +35,7 @@ import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
@ -139,4 +142,9 @@ public abstract class BaseBatch2Config {
protected int getConcurrentConsumers() {
return 4;
}
@Bean
public IJobPartitionProvider jobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
return new JobPartitionProvider(theRequestPartitionHelperSvc);
}
}

View File

@ -0,0 +1,25 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import java.util.List;
/**
* The default implementation, which uses {@link IRequestPartitionHelperSvc} to compute the partition to run a batch2 job.
*/
public class JobPartitionProvider implements IJobPartitionProvider {
private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
public JobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
myRequestPartitionHelperSvc = theRequestPartitionHelperSvc;
}
@Override
public List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation) {
return List.of(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(
theRequestDetails, theOperation));
}
}

View File

@ -19,6 +19,7 @@
*/
package ca.uhn.fhir.batch2.jobs.chunk;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.server.util.JsonDateDeserializer;
import ca.uhn.fhir.rest.server.util.JsonDateSerializer;
@ -26,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.Date;
@ -42,23 +44,61 @@ public class ChunkRangeJson implements IModelJson {
@Nonnull
private Date myEnd;
@Nullable
@JsonProperty("url")
private String myUrl;
@JsonProperty("resourceType")
private String myResourceType;
@Nullable
@JsonProperty("partitionId")
private RequestPartitionId myPartitionId;
public ChunkRangeJson() {}
public ChunkRangeJson(@Nonnull Date theStart, @Nonnull Date theEnd) {
this.myStart = theStart;
this.myEnd = theEnd;
}
@Nonnull
public Date getStart() {
return myStart;
}
public ChunkRangeJson setStart(@Nonnull Date theStart) {
myStart = theStart;
return this;
}
@Nonnull
public Date getEnd() {
return myEnd;
}
public ChunkRangeJson setEnd(@Nonnull Date theEnd) {
myEnd = theEnd;
@Nullable
public String getUrl() {
return myUrl;
}
public ChunkRangeJson setUrl(@Nullable String theUrl) {
myUrl = theUrl;
return this;
}
@Nonnull
public String getResourceType() {
return myResourceType;
}
public ChunkRangeJson setResourceType(@Nullable String theResourceType) {
myResourceType = theResourceType;
return this;
}
@Nullable
public RequestPartitionId getPartitionId() {
return myPartitionId;
}
public ChunkRangeJson setPartitionId(@Nullable RequestPartitionId thePartitionId) {
myPartitionId = thePartitionId;
return this;
}
}

View File

@ -1,39 +0,0 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.chunk;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;
public class PartitionedUrlChunkRangeJson extends ChunkRangeJson {
@Nullable
@JsonProperty("partitionedUrl")
private PartitionedUrl myPartitionedUrl;
@Nullable
public PartitionedUrl getPartitionedUrl() {
return myPartitionedUrl;
}
public void setPartitionedUrl(@Nullable PartitionedUrl thePartitionedUrl) {
myPartitionedUrl = thePartitionedUrl;
}
}

View File

@ -0,0 +1,116 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.parameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
/**
* Can be used to configure parameters for batch2 jobs.
* Please note that these need to be backward compatible as we do not have a way to migrate them to a different structure at the moment.
*/
public class JobParameters implements IModelJson {
@JsonProperty(value = "partitionId")
private List<RequestPartitionId> myRequestPartitionIds;
@JsonProperty("batchSize")
private Integer myBatchSize;
@JsonProperty("partitionedUrl")
private List<PartitionedUrl> myPartitionedUrls;
public void setRequestPartitionId(@Nullable RequestPartitionId theRequestPartitionId) {
if (theRequestPartitionId != null) {
myRequestPartitionIds = List.of(theRequestPartitionId);
}
}
@Nullable
public RequestPartitionId getRequestPartitionId() {
return getFirstRequestPartitionIdOrNull();
}
@Nullable
private RequestPartitionId getFirstRequestPartitionIdOrNull() {
return myRequestPartitionIds == null || myRequestPartitionIds.isEmpty() ? null : myRequestPartitionIds.get(0);
}
@Nonnull
public List<RequestPartitionId> getRequestPartitionIds() {
if (myRequestPartitionIds == null) {
myRequestPartitionIds = new ArrayList<>();
}
return myRequestPartitionIds;
}
public void addRequestPartitionId(RequestPartitionId theRequestPartitionId) {
getRequestPartitionIds().add(theRequestPartitionId);
}
public void setBatchSize(int theBatchSize) {
myBatchSize = theBatchSize;
}
@Nullable
public Integer getBatchSize() {
return myBatchSize;
}
public List<PartitionedUrl> getPartitionedUrls() {
if (myPartitionedUrls == null) {
myPartitionedUrls = new ArrayList<>();
}
return myPartitionedUrls;
}
public void addPartitionedUrl(@Nonnull PartitionedUrl theUrl) {
getPartitionedUrls().add(theUrl);
}
public void addUrl(@Nonnull String theUrl) {
getPartitionedUrls().add(new PartitionedUrl().setUrl(theUrl));
}
@VisibleForTesting
public static JobParameters from(
List<String> theUrls, List<RequestPartitionId> thePartitions, boolean theShouldAssignPartitionToUrl) {
JobParameters parameters = new JobParameters();
if (theShouldAssignPartitionToUrl) {
assert theUrls.size() == thePartitions.size();
for (int i = 0; i < theUrls.size(); i++) {
PartitionedUrl partitionedUrl = new PartitionedUrl();
partitionedUrl.setUrl(theUrls.get(i));
partitionedUrl.setRequestPartitionId(thePartitions.get(i));
parameters.addPartitionedUrl(partitionedUrl);
}
} else {
theUrls.forEach(url -> parameters.addPartitionedUrl(new PartitionedUrl().setUrl(url)));
thePartitions.forEach(parameters::addRequestPartitionId);
}
return parameters;
}
}

View File

@ -1,53 +0,0 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.parameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;
public class PartitionedJobParameters implements IModelJson {
@JsonProperty(value = "partitionId")
@Nullable
private RequestPartitionId myRequestPartitionId;
@JsonProperty("batchSize")
@Nullable
private Integer myBatchSize;
@Nullable
public RequestPartitionId getRequestPartitionId() {
return myRequestPartitionId;
}
public void setRequestPartitionId(@Nullable RequestPartitionId theRequestPartitionId) {
myRequestPartitionId = theRequestPartitionId;
}
public void setBatchSize(int theBatchSize) {
myBatchSize = theBatchSize;
}
@Nullable
public Integer getBatchSize() {
return myBatchSize;
}
}

View File

@ -48,15 +48,17 @@ public class PartitionedUrl implements IModelJson {
return myUrl;
}
public void setUrl(String theUrl) {
public PartitionedUrl setUrl(String theUrl) {
myUrl = theUrl;
return this;
}
public RequestPartitionId getRequestPartitionId() {
return myRequestPartitionId;
}
public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) {
public PartitionedUrl setRequestPartitionId(RequestPartitionId theRequestPartitionId) {
myRequestPartitionId = theRequestPartitionId;
return this;
}
}

View File

@ -1,53 +0,0 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.parameters;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.Validate;
import java.util.ArrayList;
import java.util.List;
public class PartitionedUrlListJobParameters extends PartitionedJobParameters {
@JsonProperty("partitionedUrl")
@Nullable
private List<PartitionedUrl> myPartitionedUrls;
public List<PartitionedUrl> getPartitionedUrls() {
if (myPartitionedUrls == null) {
myPartitionedUrls = new ArrayList<>();
}
return myPartitionedUrls;
}
public PartitionedUrlListJobParameters addPartitionedUrl(@Nonnull PartitionedUrl thePartitionedUrl) {
Validate.notNull(thePartitionedUrl);
getPartitionedUrls().add(thePartitionedUrl);
return this;
}
public PartitionedUrlListJobParameters addUrl(@Nonnull String theUrl) {
PartitionedUrl partitionedUrl = new PartitionedUrl();
partitionedUrl.setUrl(theUrl);
return addPartitionedUrl(partitionedUrl);
}
}

View File

@ -0,0 +1,50 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.util.Logs;
import org.slf4j.Logger;
public class ChunkProducer implements IIdChunkProducer<ChunkRangeJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final IBatch2DaoSvc myBatch2DaoSvc;
public ChunkProducer(IBatch2DaoSvc theBatch2DaoSvc) {
myBatch2DaoSvc = theBatch2DaoSvc;
}
@Override
public IResourcePidStream fetchResourceIdStream(ChunkRangeJson theData) {
String theUrl = theData.getUrl();
RequestPartitionId targetPartitionId = theData.getPartitionId();
ourLog.info(
"Fetching resource ID chunk in partition {} for URL {} - Range {} - {}",
targetPartitionId,
theUrl,
theData.getStart(),
theData.getEnd());
return myBatch2DaoSvc.fetchResourceIdStream(theData.getStart(), theData.getEnd(), targetPartitionId, theUrl);
}
}

View File

@ -25,49 +25,103 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.util.Logs;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.slf4j.Logger;
import org.thymeleaf.util.StringUtils;
import java.util.Date;
import java.util.List;
import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE;
public class GenerateRangeChunksStep<PT extends PartitionedUrlListJobParameters>
implements IFirstJobStepWorker<PT, PartitionedUrlChunkRangeJson> {
public class GenerateRangeChunksStep<PT extends JobParameters> implements IFirstJobStepWorker<PT, ChunkRangeJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<PT, VoidModel> theStepExecutionDetails,
@Nonnull IJobDataSink<PartitionedUrlChunkRangeJson> theDataSink)
@Nonnull IJobDataSink<ChunkRangeJson> theDataSink)
throws JobExecutionFailedException {
PT params = theStepExecutionDetails.getParameters();
Date start = BATCH_START_DATE;
Date end = new Date();
if (params.getPartitionedUrls().isEmpty()) {
ourLog.info("Searching for All Resources from {} to {}", start, end);
PartitionedUrlChunkRangeJson nextRange = new PartitionedUrlChunkRangeJson();
nextRange.setStart(start);
nextRange.setEnd(end);
theDataSink.accept(nextRange);
} else {
for (PartitionedUrl nextPartitionedUrl : params.getPartitionedUrls()) {
ourLog.info("Searching for [{}]] from {} to {}", nextPartitionedUrl, start, end);
PartitionedUrlChunkRangeJson nextRange = new PartitionedUrlChunkRangeJson();
nextRange.setPartitionedUrl(nextPartitionedUrl);
nextRange.setStart(start);
nextRange.setEnd(end);
theDataSink.accept(nextRange);
// there are partitions configured in either of the following lists, which are both optional
// the following code considers all use-cases
// the logic can be simplified once PartitionedUrl.myRequestPartitionId is deprecated
// @see IJobPartitionProvider
List<RequestPartitionId> partitionIds = params.getRequestPartitionIds();
List<PartitionedUrl> partitionedUrls = params.getPartitionedUrls();
if (partitionIds.isEmpty()) {
if (partitionedUrls.isEmpty()) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end);
sendChunk(chunkRangeJson, theDataSink);
return RunOutcome.SUCCESS;
}
partitionedUrls.forEach(partitionedUrl -> {
String url = partitionedUrl.getUrl();
RequestPartitionId partitionId = partitionedUrl.getRequestPartitionId();
ChunkRangeJson chunkRangeJson =
new ChunkRangeJson(start, end).setUrl(url).setPartitionId(partitionId);
sendChunk(chunkRangeJson, theDataSink);
});
return RunOutcome.SUCCESS;
}
partitionIds.forEach(partitionId -> {
if (partitionedUrls.isEmpty()) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end).setPartitionId(partitionId);
sendChunk(chunkRangeJson, theDataSink);
return;
}
partitionedUrls.forEach(partitionedUrl -> {
String url = partitionedUrl.getUrl();
RequestPartitionId urlPartitionId = partitionedUrl.getRequestPartitionId();
RequestPartitionId narrowPartitionId = determineNarrowPartitionId(partitionId, urlPartitionId);
ChunkRangeJson chunkRangeJson =
new ChunkRangeJson(start, end).setUrl(url).setPartitionId(narrowPartitionId);
sendChunk(chunkRangeJson, theDataSink);
});
});
return RunOutcome.SUCCESS;
}
private RequestPartitionId determineNarrowPartitionId(
@Nonnull RequestPartitionId theRequestPartitionId,
@Nullable RequestPartitionId theOtherRequestPartitionId) {
if (theOtherRequestPartitionId == null) {
return theRequestPartitionId;
}
if (theRequestPartitionId.isAllPartitions() && !theOtherRequestPartitionId.isAllPartitions()) {
return theOtherRequestPartitionId;
}
if (theRequestPartitionId.isDefaultPartition()
&& !theOtherRequestPartitionId.isDefaultPartition()
&& !theOtherRequestPartitionId.isAllPartitions()) {
return theOtherRequestPartitionId;
}
return theRequestPartitionId;
}
private void sendChunk(ChunkRangeJson theData, IJobDataSink<ChunkRangeJson> theDataSink) {
String url = theData.getUrl();
ourLog.info(
"Creating chunks for [{}] from {} to {} for partition {}",
!StringUtils.isEmpty(url) ? url : "everything",
theData.getStart(),
theData.getEnd(),
theData.getPartitionId());
theDataSink.accept(theData);
}
}

View File

@ -27,13 +27,18 @@ import jakarta.annotation.Nullable;
import java.util.Date;
/**
* A service that produces pages of resource pids based on the data provided by a previous batch step. Typically the
* A service that produces pages of resource pids based on the data provided by a previous batch step. Typically, the
* first step in a batch job produces work chunks that define what types of data the batch operation will be performing
* (e.g. a list of resource types and date ranges). This service is then used by the second step to actually query and
* page through resource pids based on the chunk definitions produced by the first step.
* @param <IT> This parameter defines constraints on the types of pids we are pulling (e.g. resource type, url, etc).
* @param <IT> This parameter defines constraints on the types of pids we are pulling (e.g. resource type, url, etc.).
*/
public interface IIdChunkProducer<IT extends ChunkRangeJson> {
IResourcePidStream fetchResourceIdStream(
Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, IT theData);
@Deprecated(since = "7.3.7")
default IResourcePidStream fetchResourceIdStream(
Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, IT theData) {
return fetchResourceIdStream(theData);
}
IResourcePidStream fetchResourceIdStream(IT theData);
}

View File

@ -24,26 +24,25 @@ import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import jakarta.annotation.Nonnull;
public class LoadIdsStep<PT extends PartitionedUrlListJobParameters>
implements IJobStepWorker<PT, PartitionedUrlChunkRangeJson, ResourceIdListWorkChunkJson> {
private final ResourceIdListStep<PT, PartitionedUrlChunkRangeJson> myResourceIdListStep;
public class LoadIdsStep<PT extends JobParameters>
implements IJobStepWorker<PT, ChunkRangeJson, ResourceIdListWorkChunkJson> {
private final ResourceIdListStep<PT> myResourceIdListStep;
public LoadIdsStep(IBatch2DaoSvc theBatch2DaoSvc) {
IIdChunkProducer<PartitionedUrlChunkRangeJson> idChunkProducer =
new PartitionedUrlListIdChunkProducer(theBatch2DaoSvc);
IIdChunkProducer<ChunkRangeJson> idChunkProducer = new ChunkProducer(theBatch2DaoSvc);
myResourceIdListStep = new ResourceIdListStep<>(idChunkProducer);
}
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<PT, PartitionedUrlChunkRangeJson> theStepExecutionDetails,
@Nonnull StepExecutionDetails<PT, ChunkRangeJson> theStepExecutionDetails,
@Nonnull IJobDataSink<ResourceIdListWorkChunkJson> theDataSink)
throws JobExecutionFailedException {
return myResourceIdListStep.run(theStepExecutionDetails, theDataSink);

View File

@ -1,67 +0,0 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.util.Logs;
import jakarta.annotation.Nullable;
import org.slf4j.Logger;
import java.util.Date;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class PartitionedUrlListIdChunkProducer implements IIdChunkProducer<PartitionedUrlChunkRangeJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final IBatch2DaoSvc myBatch2DaoSvc;
public PartitionedUrlListIdChunkProducer(IBatch2DaoSvc theBatch2DaoSvc) {
myBatch2DaoSvc = theBatch2DaoSvc;
}
@Override
public IResourcePidStream fetchResourceIdStream(
Date theStart,
Date theEnd,
@Nullable RequestPartitionId theRequestPartitionId,
PartitionedUrlChunkRangeJson theData) {
PartitionedUrl partitionedUrl = theData.getPartitionedUrl();
RequestPartitionId targetPartitionId;
String theUrl;
if (partitionedUrl == null) {
theUrl = null;
targetPartitionId = theRequestPartitionId;
ourLog.info("Fetching resource ID chunk for everything - Range {} - {}", theStart, theEnd);
} else {
theUrl = partitionedUrl.getUrl();
targetPartitionId = defaultIfNull(partitionedUrl.getRequestPartitionId(), theRequestPartitionId);
ourLog.info(
"Fetching resource ID chunk for URL {} - Range {} - {}", partitionedUrl.getUrl(), theStart, theEnd);
}
return myBatch2DaoSvc.fetchResourceIdStream(theStart, theEnd, targetPartitionId, theUrl);
}
}

View File

@ -27,7 +27,7 @@ import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.util.Logs;
@ -42,39 +42,35 @@ import java.util.stream.Stream;
import static ca.uhn.fhir.util.StreamUtil.partition;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class ResourceIdListStep<PT extends PartitionedJobParameters, IT extends ChunkRangeJson>
implements IJobStepWorker<PT, IT, ResourceIdListWorkChunkJson> {
public class ResourceIdListStep<PT extends JobParameters>
implements IJobStepWorker<PT, ChunkRangeJson, ResourceIdListWorkChunkJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
protected static final int MAX_BATCH_OF_IDS = 500;
private final IIdChunkProducer<IT> myIdChunkProducer;
private final IIdChunkProducer<ChunkRangeJson> myIdChunkProducer;
public ResourceIdListStep(IIdChunkProducer<IT> theIdChunkProducer) {
public ResourceIdListStep(IIdChunkProducer<ChunkRangeJson> theIdChunkProducer) {
myIdChunkProducer = theIdChunkProducer;
}
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<PT, IT> theStepExecutionDetails,
@Nonnull StepExecutionDetails<PT, ChunkRangeJson> theStepExecutionDetails,
@Nonnull IJobDataSink<ResourceIdListWorkChunkJson> theDataSink)
throws JobExecutionFailedException {
IT data = theStepExecutionDetails.getData();
ChunkRangeJson data = theStepExecutionDetails.getData();
Date start = data.getStart();
Date end = data.getEnd();
Integer batchSize = theStepExecutionDetails.getParameters().getBatchSize();
ourLog.info("Beginning scan for reindex IDs in range {} to {}", start, end);
RequestPartitionId requestPartitionId =
theStepExecutionDetails.getParameters().getRequestPartitionId();
ourLog.info("Beginning to submit chunks in range {} to {}", start, end);
int chunkSize = Math.min(defaultIfNull(batchSize, MAX_BATCH_OF_IDS), MAX_BATCH_OF_IDS);
final IResourcePidStream searchResult = myIdChunkProducer.fetchResourceIdStream(
start, end, requestPartitionId, theStepExecutionDetails.getData());
final IResourcePidStream searchResult =
myIdChunkProducer.fetchResourceIdStream(theStepExecutionDetails.getData());
searchResult.visitStreamNoResult(typedResourcePidStream -> {
AtomicInteger totalIdsFound = new AtomicInteger();
@ -101,7 +97,7 @@ public class ResourceIdListStep<PT extends PartitionedJobParameters, IT extends
if (theTypedPids.isEmpty()) {
return;
}
ourLog.info("Submitting work chunk with {} IDs", theTypedPids.size());
ourLog.info("Submitting work chunk in partition {} with {} IDs", theRequestPartitionId, theTypedPids.size());
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(theTypedPids, theRequestPartitionId);
ourLog.debug("IDs are: {}", data);
theDataSink.accept(data);

View File

@ -82,6 +82,12 @@ public class JobInstanceStartRequest implements IModelJson {
myParameters = theParameters;
}
/**
* Sets the parameters for the job.
* Please note that these need to be backward compatible as we do not have a way to migrate them to a different structure at the moment.
* @param theParameters the parameters
* @return the current instance.
*/
public JobInstanceStartRequest setParameters(IModelJson theParameters) {
myParameters = JsonUtil.serializeWithSensitiveData(theParameters);
return this;

View File

@ -0,0 +1,42 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class JobPartitionProviderTest {
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@InjectMocks
private JobPartitionProvider myJobPartitionProvider;
@Test
public void getPartitions_requestSpecificPartition_returnsPartition() {
// setup
SystemRequestDetails requestDetails = new SystemRequestDetails();
String operation = ProviderConstants.OPERATION_EXPORT;
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(eq(requestDetails), eq(operation))).thenReturn(partitionId);
// test
List <RequestPartitionId> partitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation);
// verify
assertThat(partitionIds).hasSize(1);
assertThat(partitionIds).containsExactlyInAnyOrder(partitionId);
}
}

View File

@ -0,0 +1,53 @@
package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Date;
import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class ChunkProducerTest {
@Mock
private IBatch2DaoSvc myBatch2DaoSvc;
@InjectMocks
private ChunkProducer myChunkProducer;
@Test
public void fetchResourceIdStream_worksAsExpected() {
// setup
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
String url = "Patient?";
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(BATCH_START_DATE, new Date()).setPartitionId(partitionId).setUrl(url);
IResourcePidStream stream = mock(IResourcePidStream.class);
when(myBatch2DaoSvc.fetchResourceIdStream(any(), any(), any(), any())).thenReturn(stream);
// test
IResourcePidStream actualStream = myChunkProducer.fetchResourceIdStream(chunkRangeJson);
// verify
assertThat(actualStream).isSameAs(stream);
ArgumentCaptor<Date> dateCaptor = ArgumentCaptor.forClass(Date.class);
ArgumentCaptor<RequestPartitionId> partitionCaptor = ArgumentCaptor.forClass(RequestPartitionId.class);
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
verify(myBatch2DaoSvc).fetchResourceIdStream(dateCaptor.capture(), dateCaptor.capture(), partitionCaptor.capture(), urlCaptor.capture());
assertThat(dateCaptor.getAllValues()).containsExactly(chunkRangeJson.getStart(), chunkRangeJson.getEnd());
assertThat(partitionCaptor.getValue()).isEqualTo(partitionId);
assertThat(urlCaptor.getValue()).isEqualTo(url);
}
}

View File

@ -0,0 +1,140 @@
package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Stream;
import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class GenerateRangeChunksStepTest {
private final GenerateRangeChunksStep<JobParameters> myStep = new GenerateRangeChunksStep<>();
@Mock
private StepExecutionDetails<JobParameters, VoidModel> myStepExecutionDetails;
@Mock
private IJobDataSink<ChunkRangeJson> myJobDataSink;
private static final Date START = BATCH_START_DATE;
private static final Date END = new Date();
@BeforeEach
void setUp() {
}
@AfterEach
void tearDown() {
}
public static Stream<Arguments> getReindexParameters() {
List<RequestPartitionId> threePartitions = List.of(
RequestPartitionId.fromPartitionId(1),
RequestPartitionId.fromPartitionId(2),
RequestPartitionId.fromPartitionId(3)
);
List<RequestPartitionId> partition1 = List.of(RequestPartitionId.fromPartitionId(1));
// the actual values (URLs, partitionId) don't matter, but we add values similar to real hapi-fhir use-cases
return Stream.of(
Arguments.of(List.of(), List.of(), false, 1),
Arguments.of(List.of(), partition1, false, 1),
Arguments.of(List.of("Observation?"), threePartitions, false, 3),
Arguments.of(List.of("Observation?"), List.of(), false, 1),
Arguments.of(List.of("Observation?"), partition1, true, 1),
Arguments.of(List.of("Observation?", "Patient?"), threePartitions, false, 6),
Arguments.of(List.of("Observation?", "Patient?", "Practitioner?"), threePartitions, true, 3),
Arguments.of(List.of("Observation?status=final", "Patient?"), partition1, false, 2),
Arguments.of(List.of("Observation?status=final"), threePartitions, false, 2)
);
}
@ParameterizedTest
@MethodSource(value = "getReindexParameters")
public void run_withParameters_producesExpectedChunks(List<String> theUrls, List<RequestPartitionId> thePartitions,
boolean theShouldAssignPartitionToUrl, int theExpectedChunkCount) {
JobParameters parameters = JobParameters.from(theUrls, thePartitions, theShouldAssignPartitionToUrl);
when(myStepExecutionDetails.getParameters()).thenReturn(parameters);
myStep.run(myStepExecutionDetails, myJobDataSink);
ArgumentCaptor<ChunkRangeJson> captor = ArgumentCaptor.forClass(ChunkRangeJson.class);
verify(myJobDataSink, times(theExpectedChunkCount)).accept(captor.capture());
List<ChunkRangeJson> chunkRangeJsonList = getExpectedChunkList(theUrls, thePartitions, theShouldAssignPartitionToUrl, theExpectedChunkCount);
RequestPartitionId[] actualPartitionIds = captor.getAllValues().stream().map(ChunkRangeJson::getPartitionId).toList().toArray(new RequestPartitionId[0]);
RequestPartitionId[] expectedPartitionIds = chunkRangeJsonList.stream().map(ChunkRangeJson::getPartitionId).toList().toArray(new RequestPartitionId[0]);
assertThat(actualPartitionIds).containsExactlyInAnyOrder(expectedPartitionIds);
String[] actualUrls = captor.getAllValues().stream().map(ChunkRangeJson::getUrl).toList().toArray(new String[0]);
String[] expectedUrls = chunkRangeJsonList.stream().map(ChunkRangeJson::getUrl).toList().toArray(new String[0]);
assertThat(actualUrls).containsExactlyInAnyOrder(expectedUrls);
}
private List<ChunkRangeJson> getExpectedChunkList(List<String> theUrls, List<RequestPartitionId> thePartitions,
boolean theShouldAssignPartitionToUrl, int theExpectedChunkCount) {
List<ChunkRangeJson> chunkRangeJsonList = new ArrayList<>();
if (theShouldAssignPartitionToUrl) {
for (int i = 0; i < theExpectedChunkCount; i++) {
String url = theUrls.get(i);
RequestPartitionId partition = thePartitions.get(i);
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setUrl(url).setPartitionId(partition);
chunkRangeJsonList.add(chunkRangeJson);
}
return chunkRangeJsonList;
}
if (theUrls.isEmpty() && thePartitions.isEmpty()) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END);
chunkRangeJsonList.add(chunkRangeJson);
return chunkRangeJsonList;
}
if (theUrls.isEmpty()) {
for (RequestPartitionId partition : thePartitions) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setPartitionId(partition);
chunkRangeJsonList.add(chunkRangeJson);
}
return chunkRangeJsonList;
}
if (thePartitions.isEmpty()) {
for (String url : theUrls) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setUrl(url);
chunkRangeJsonList.add(chunkRangeJson);
}
return chunkRangeJsonList;
}
theUrls.forEach(url -> {
for (RequestPartitionId partition : thePartitions) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setUrl(url).setPartitionId(partition);
chunkRangeJsonList.add(chunkRangeJson);
}
});
return chunkRangeJsonList;
}
}

View File

@ -2,11 +2,12 @@ package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
@ -48,11 +49,11 @@ public class LoadIdsStepTest {
@Mock
private IJobDataSink<ResourceIdListWorkChunkJson> mySink;
private LoadIdsStep mySvc;
private LoadIdsStep<JobParameters> mySvc;
@BeforeEach
public void before() {
mySvc = new LoadIdsStep(myBatch2DaoSvc);
mySvc = new LoadIdsStep<>(myBatch2DaoSvc);
}
@Captor
@ -60,18 +61,17 @@ public class LoadIdsStepTest {
@Test
public void testGenerateSteps() {
PartitionedUrlListJobParameters parameters = new PartitionedUrlListJobParameters();
PartitionedUrlChunkRangeJson range = new PartitionedUrlChunkRangeJson();
range.setStart(DATE_1).setEnd(DATE_END);
JobParameters parameters = new JobParameters();
ChunkRangeJson range = new ChunkRangeJson(DATE_1, DATE_END).setPartitionId(RequestPartitionId.allPartitions());
String instanceId = "instance-id";
JobInstance jobInstance = JobInstance.fromInstanceId(instanceId);
String chunkId = "chunk-id";
StepExecutionDetails<PartitionedUrlListJobParameters, PartitionedUrlChunkRangeJson> details = new StepExecutionDetails<>(parameters, range, jobInstance, new WorkChunk().setId(chunkId));
StepExecutionDetails<JobParameters, ChunkRangeJson> details = new StepExecutionDetails<>(parameters, range, jobInstance, new WorkChunk().setId(chunkId));
// First Execution
when(myBatch2DaoSvc.fetchResourceIdStream(eq(DATE_1), eq(DATE_END), isNull(), isNull()))
.thenReturn(createIdChunk(0L, 20000L, DATE_2));
.thenReturn(createIdChunk());
mySvc.run(details, mySink);
@ -98,13 +98,12 @@ public class LoadIdsStepTest {
}
@Nonnull
private IResourcePidStream createIdChunk(long idLow, long idHigh, Date lastDate) {
private IResourcePidStream createIdChunk() {
List<IResourcePersistentId> ids = new ArrayList<>();
List<String> resourceTypes = new ArrayList<>();
for (long i = idLow; i < idHigh; i++) {
for (long i = 0; i < 20000; i++) {
ids.add(JpaPid.fromId(i));
}
IResourcePidList chunk = new HomogeneousResourcePidList("Patient", ids, lastDate, null);
IResourcePidList chunk = new HomogeneousResourcePidList("Patient", ids, DATE_2, null);
return new ListWrappingPidStream(chunk);
}

View File

@ -3,9 +3,10 @@ package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.pid.ListWrappingPidStream;
@ -35,20 +36,20 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class ResourceIdListStepTest {
@Mock
private IIdChunkProducer<PartitionedUrlChunkRangeJson> myIdChunkProducer;
private IIdChunkProducer<ChunkRangeJson> myIdChunkProducer;
@Mock
private StepExecutionDetails<PartitionedUrlListJobParameters, PartitionedUrlChunkRangeJson> myStepExecutionDetails;
private StepExecutionDetails<JobParameters, ChunkRangeJson> myStepExecutionDetails;
@Mock
private IJobDataSink<ResourceIdListWorkChunkJson> myDataSink;
@Mock
private PartitionedUrlChunkRangeJson myData;
private ChunkRangeJson myData;
@Mock
private PartitionedUrlListJobParameters myParameters;
private JobParameters myParameters;
@Captor
private ArgumentCaptor<ResourceIdListWorkChunkJson> myDataCaptor;
private ResourceIdListStep<PartitionedUrlListJobParameters, PartitionedUrlChunkRangeJson> myResourceIdListStep;
private ResourceIdListStep<JobParameters> myResourceIdListStep;
@BeforeEach
void beforeEach() {
@ -59,11 +60,13 @@ class ResourceIdListStepTest {
@ValueSource(ints = {0, 1, 100, 500, 501, 2345, 10500})
void testResourceIdListBatchSizeLimit(int theListSize) {
List<IResourcePersistentId> idList = generateIdList(theListSize);
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
when(myStepExecutionDetails.getData()).thenReturn(myData);
when(myParameters.getBatchSize()).thenReturn(500);
when(myStepExecutionDetails.getParameters()).thenReturn(myParameters);
IResourcePidStream mockStream = new ListWrappingPidStream(
new HomogeneousResourcePidList("Patient", idList, null, null));
IResourcePidStream resourcePidStream = new ListWrappingPidStream(
new HomogeneousResourcePidList("Patient", idList, null, partitionId));
if (theListSize > 0) {
// Ensure none of the work chunks exceed MAX_BATCH_OF_IDS in size:
doAnswer(i -> {
@ -73,8 +76,7 @@ class ResourceIdListStepTest {
return null;
}).when(myDataSink).accept(any(ResourceIdListWorkChunkJson.class));
}
when(myIdChunkProducer.fetchResourceIdStream(any(), any(), any(), any()))
.thenReturn(mockStream);
when(myIdChunkProducer.fetchResourceIdStream(any())).thenReturn(resourcePidStream);
final RunOutcome run = myResourceIdListStep.run(myStepExecutionDetails, myDataSink);
assertThat(run).isNotEqualTo(null);
@ -86,8 +88,9 @@ class ResourceIdListStepTest {
assertThat(allDataChunks).hasSize(expectedBatchCount);
// Ensure that all chunks except the very last one are MAX_BATCH_OF_IDS in length
for (int i = 0; i < expectedBatchCount - 1; i++) {
assertEquals(ResourceIdListStep.MAX_BATCH_OF_IDS, allDataChunks.get(i).size());
for (ResourceIdListWorkChunkJson dataChunk : allDataChunks) {
assertEquals(ResourceIdListStep.MAX_BATCH_OF_IDS, dataChunk.size());
assertEquals(partitionId, dataChunk.getRequestPartitionId());
}
// The very last chunk should be whatever is left over (if there is a remainder):

View File

@ -30,7 +30,7 @@
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-storage-batch2</artifactId>
<artifactId>hapi-fhir-storage-batch2-jobs</artifactId>
<version>${project.version}</version>
</dependency>

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.step.IIdChunkProducer;
import ca.uhn.fhir.batch2.jobs.step.ResourceIdListStep;
@ -32,11 +33,11 @@ import ca.uhn.fhir.mdm.batch2.clear.MdmClearJobParameters;
import jakarta.annotation.Nonnull;
public class LoadGoldenIdsStep
implements IJobStepWorker<MdmClearJobParameters, MdmChunkRangeJson, ResourceIdListWorkChunkJson> {
private final ResourceIdListStep<MdmClearJobParameters, MdmChunkRangeJson> myResourceIdListStep;
implements IJobStepWorker<MdmClearJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> {
private final ResourceIdListStep<MdmClearJobParameters> myResourceIdListStep;
public LoadGoldenIdsStep(IGoldenResourceSearchSvc theGoldenResourceSearchSvc) {
IIdChunkProducer<MdmChunkRangeJson> idChunkProducer = new MdmIdChunkProducer(theGoldenResourceSearchSvc);
IIdChunkProducer<ChunkRangeJson> idChunkProducer = new MdmIdChunkProducer(theGoldenResourceSearchSvc);
myResourceIdListStep = new ResourceIdListStep<>(idChunkProducer);
}
@ -44,7 +45,7 @@ public class LoadGoldenIdsStep
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<MdmClearJobParameters, MdmChunkRangeJson> theStepExecutionDetails,
@Nonnull StepExecutionDetails<MdmClearJobParameters, ChunkRangeJson> theStepExecutionDetails,
@Nonnull IJobDataSink<ResourceIdListWorkChunkJson> theDataSink)
throws JobExecutionFailedException {
return myResourceIdListStep.run(theStepExecutionDetails, theDataSink);

View File

@ -20,6 +20,7 @@
package ca.uhn.fhir.mdm.batch2;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.jobs.config.BatchCommonCtx;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearAppCtx;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearJobParameters;
@ -34,7 +35,7 @@ import static ca.uhn.fhir.mdm.batch2.clear.MdmClearAppCtx.MDM_CLEAR_JOB_BEAN_NAM
import static ca.uhn.fhir.mdm.batch2.submit.MdmSubmitAppCtx.MDM_SUBMIT_JOB_BEAN_NAME;
@Configuration
@Import({MdmClearAppCtx.class, MdmSubmitAppCtx.class})
@Import({MdmClearAppCtx.class, MdmSubmitAppCtx.class, BatchCommonCtx.class})
public class MdmBatch2Config {
@Bean
MdmJobDefinitionLoader mdmJobDefinitionLoader(

View File

@ -1,40 +0,0 @@
/*-
* #%L
* hapi-fhir-storage-mdm
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.mdm.batch2;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
public class MdmChunkRangeJson extends ChunkRangeJson {
@Nonnull
@JsonProperty("resourceType")
private String myResourceType;
@Nonnull
public String getResourceType() {
return myResourceType;
}
public void setResourceType(@Nullable String theResourceType) {
myResourceType = theResourceType;
}
}

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.util.Batch2Utils;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearJobParameters;
import jakarta.annotation.Nonnull;
@ -33,14 +34,14 @@ import org.slf4j.LoggerFactory;
import java.util.Date;
public class MdmGenerateRangeChunksStep implements IFirstJobStepWorker<MdmClearJobParameters, MdmChunkRangeJson> {
public class MdmGenerateRangeChunksStep implements IFirstJobStepWorker<MdmClearJobParameters, ChunkRangeJson> {
private static final Logger ourLog = LoggerFactory.getLogger(MdmGenerateRangeChunksStep.class);
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<MdmClearJobParameters, VoidModel> theStepExecutionDetails,
@Nonnull IJobDataSink<MdmChunkRangeJson> theDataSink)
@Nonnull IJobDataSink<ChunkRangeJson> theDataSink)
throws JobExecutionFailedException {
MdmClearJobParameters params = theStepExecutionDetails.getParameters();
@ -49,10 +50,10 @@ public class MdmGenerateRangeChunksStep implements IFirstJobStepWorker<MdmClearJ
for (String nextResourceType : params.getResourceNames()) {
ourLog.info("Initiating mdm clear of [{}]] Golden Resources from {} to {}", nextResourceType, start, end);
MdmChunkRangeJson nextRange = new MdmChunkRangeJson();
nextRange.setResourceType(nextResourceType);
nextRange.setStart(start);
nextRange.setEnd(end);
assert nextResourceType != null;
ChunkRangeJson nextRange = new ChunkRangeJson(start, end)
.setResourceType(nextResourceType)
.setPartitionId(params.getRequestPartitionId());
theDataSink.accept(nextRange);
}

View File

@ -19,17 +19,14 @@
*/
package ca.uhn.fhir.mdm.batch2;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.step.IIdChunkProducer;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.svc.IGoldenResourceSearchSvc;
import jakarta.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
public class MdmIdChunkProducer implements IIdChunkProducer<MdmChunkRangeJson> {
public class MdmIdChunkProducer implements IIdChunkProducer<ChunkRangeJson> {
private static final Logger ourLog = LoggerFactory.getLogger(MdmIdChunkProducer.class);
private final IGoldenResourceSearchSvc myGoldenResourceSearchSvc;
@ -38,17 +35,16 @@ public class MdmIdChunkProducer implements IIdChunkProducer<MdmChunkRangeJson> {
}
@Override
public IResourcePidStream fetchResourceIdStream(
Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, MdmChunkRangeJson theData) {
public IResourcePidStream fetchResourceIdStream(ChunkRangeJson theData) {
String resourceType = theData.getResourceType();
ourLog.info(
"Fetching golden resource ID chunk for resource type {} - Range {} - {}",
resourceType,
theStart,
theEnd);
theData.getStart(),
theData.getEnd());
return myGoldenResourceSearchSvc.fetchGoldenResourceIdStream(
theStart, theEnd, theRequestPartitionId, resourceType);
theData.getStart(), theData.getEnd(), theData.getPartitionId(), resourceType);
}
}

View File

@ -19,13 +19,13 @@
*/
package ca.uhn.fhir.mdm.batch2.clear;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.svc.IGoldenResourceSearchSvc;
import ca.uhn.fhir.mdm.api.IMdmSettings;
import ca.uhn.fhir.mdm.batch2.LoadGoldenIdsStep;
import ca.uhn.fhir.mdm.batch2.MdmChunkRangeJson;
import ca.uhn.fhir.mdm.batch2.MdmGenerateRangeChunksStep;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -50,7 +50,7 @@ public class MdmClearAppCtx {
.addFirstStep(
"generate-ranges",
"Generate date ranges to Mdm Clear",
MdmChunkRangeJson.class,
ChunkRangeJson.class,
mdmGenerateRangeChunksStep())
.addIntermediateStep(
"find-golden-resource-ids",

View File

@ -19,7 +19,7 @@
*/
package ca.uhn.fhir.mdm.batch2.clear;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import jakarta.validation.constraints.Pattern;
@ -28,7 +28,7 @@ import org.apache.commons.lang3.Validate;
import java.util.ArrayList;
import java.util.List;
public class MdmClearJobParameters extends PartitionedJobParameters {
public class MdmClearJobParameters extends JobParameters {
@JsonProperty("resourceType")
@Nonnull
private List<@Pattern(regexp = "^[A-Z][A-Za-z]+$", message = "If populated, must be a valid resource type'") String>

View File

@ -75,8 +75,7 @@ public class MdmClearStep implements IJobStepWorker<MdmClearJobParameters, Resou
SystemRequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRetry(true);
requestDetails.setMaxRetries(100);
requestDetails.setRequestPartitionId(
theStepExecutionDetails.getParameters().getRequestPartitionId());
requestDetails.setRequestPartitionId(theStepExecutionDetails.getData().getRequestPartitionId());
TransactionDetails transactionDetails = new TransactionDetails();
myHapiTransactionService.execute(
requestDetails,

View File

@ -21,7 +21,7 @@ package ca.uhn.fhir.mdm.batch2.submit;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
@ -39,11 +39,6 @@ public class MdmSubmitAppCtx {
public static final String MDM_SUBMIT_JOB_BEAN_NAME = "mdmSubmitJobDefinition";
public static String MDM_SUBMIT_JOB = "MDM_SUBMIT";
@Bean
public GenerateRangeChunksStep<MdmSubmitJobParameters> submitGenerateRangeChunksStep() {
return new GenerateRangeChunksStep<>();
}
@Bean(name = MDM_SUBMIT_JOB_BEAN_NAME)
public JobDefinition<MdmSubmitJobParameters> mdmSubmitJobDefinition(
IBatch2DaoSvc theBatch2DaoSvc,
@ -60,7 +55,7 @@ public class MdmSubmitAppCtx {
.addFirstStep(
"generate-ranges",
"generate data ranges to submit to mdm",
PartitionedUrlChunkRangeJson.class,
ChunkRangeJson.class,
submitGenerateRangeChunksStep())
.addIntermediateStep(
"load-ids", "Load the IDs", ResourceIdListWorkChunkJson.class, loadIdsStep(theBatch2DaoSvc))
@ -77,6 +72,11 @@ public class MdmSubmitAppCtx {
return new MdmSubmitJobParametersValidator(theMdmSettings, theMatchUrlService, theFhirContext);
}
@Bean
public GenerateRangeChunksStep<MdmSubmitJobParameters> submitGenerateRangeChunksStep() {
return new GenerateRangeChunksStep<>();
}
@Bean
public LoadIdsStep<MdmSubmitJobParameters> loadIdsStep(IBatch2DaoSvc theBatch2DaoSvc) {
return new LoadIdsStep<>(theBatch2DaoSvc);

View File

@ -19,6 +19,6 @@
*/
package ca.uhn.fhir.mdm.batch2.submit;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
public class MdmSubmitJobParameters extends PartitionedUrlListJobParameters {}
public class MdmSubmitJobParameters extends JobParameters {}

View File

@ -54,25 +54,25 @@ public class MdmSubmitJobParametersValidator implements IJobParametersValidator<
String resourceType = getResourceTypeFromUrl(url);
RuntimeResourceDefinition resourceDefinition = myFhirContext.getResourceDefinition(resourceType);
validateTypeIsUsedByMdm(errorMsgs, resourceType);
validateAllSearchParametersApplyToResourceType(errorMsgs, partitionedUrl, resourceType, resourceDefinition);
validateAllSearchParametersApplyToResourceType(errorMsgs, url, resourceType, resourceDefinition);
}
return errorMsgs;
}
private void validateAllSearchParametersApplyToResourceType(
List<String> errorMsgs,
PartitionedUrl partitionedUrl,
String resourceType,
List<String> theErrorMessages,
String theUrl,
String theResourceType,
RuntimeResourceDefinition resourceDefinition) {
try {
myMatchUrlService.translateMatchUrl(partitionedUrl.getUrl(), resourceDefinition);
myMatchUrlService.translateMatchUrl(theUrl, resourceDefinition);
} catch (MatchUrlService.UnrecognizedSearchParameterException e) {
String errorMsg = String.format(
"Search parameter %s is not recognized for resource type %s. Source error is %s",
e.getParamName(), resourceType, e.getMessage());
errorMsgs.add(errorMsg);
e.getParamName(), theResourceType, e.getMessage());
theErrorMessages.add(errorMsg);
} catch (InvalidRequestException e) {
errorMsgs.add("Invalid request detected: " + e.getMessage());
theErrorMessages.add("Invalid request detected: " + e.getMessage());
}
}

View File

@ -38,8 +38,11 @@ import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -50,6 +53,7 @@ import static ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster.doCal
import static ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster.hasHooks;
public abstract class BaseRequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
private static final Logger ourLog = LoggerFactory.getLogger(BaseRequestPartitionHelperSvc.class);
private final HashSet<Object> myNonPartitionableResourceNames;
@ -95,81 +99,86 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition
@Override
public RequestPartitionId determineReadPartitionForRequest(
@Nullable RequestDetails theRequest, @Nonnull ReadPartitionIdRequestDetails theDetails) {
RequestPartitionId requestPartitionId;
String resourceType = theDetails.getResourceType();
boolean nonPartitionableResource = !isResourcePartitionable(resourceType);
if (myPartitionSettings.isPartitioningEnabled()) {
RequestDetails requestDetails = theRequest;
// TODO GGG eventually, theRequest will not be allowed to be null here, and we will pass through
// SystemRequestDetails instead.
if (requestDetails == null) {
requestDetails = new SystemRequestDetails();
}
// Handle system requests
if (requestDetails instanceof SystemRequestDetails
&& systemRequestHasExplicitPartition((SystemRequestDetails) requestDetails)
&& !nonPartitionableResource) {
requestPartitionId =
getSystemRequestPartitionId((SystemRequestDetails) requestDetails, nonPartitionableResource);
} else if ((requestDetails instanceof SystemRequestDetails) && nonPartitionableResource) {
return RequestPartitionId.fromPartitionId(myPartitionSettings.getDefaultPartitionId());
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, requestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY
HookParams params = new HookParams()
.add(RequestDetails.class, requestDetails)
.addIfMatchesType(ServletRequestDetails.class, requestDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params);
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_READ, myInterceptorBroadcaster, requestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_READ
HookParams params = new HookParams()
.add(RequestDetails.class, requestDetails)
.addIfMatchesType(ServletRequestDetails.class, requestDetails)
.add(ReadPartitionIdRequestDetails.class, theDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_READ, params);
} else {
requestPartitionId = null;
}
validateRequestPartitionNotNull(requestPartitionId, Pointcut.STORAGE_PARTITION_IDENTIFY_READ);
return validateNormalizeAndNotifyHooksForRead(requestPartitionId, requestDetails, resourceType);
if (!myPartitionSettings.isPartitioningEnabled()) {
return RequestPartitionId.allPartitions();
}
return RequestPartitionId.allPartitions();
// certain use-cases (e.g. batch2 jobs), only have resource type populated in the ReadPartitionIdRequestDetails
// TODO MM: see if we can make RequestDetails consistent
String resourceType = theDetails.getResourceType();
RequestDetails requestDetails = theRequest;
// TODO GGG eventually, theRequest will not be allowed to be null here, and we will pass through
// SystemRequestDetails instead.
if (requestDetails == null) {
requestDetails = new SystemRequestDetails();
}
boolean nonPartitionableResource = isResourceNonPartitionable(resourceType);
RequestPartitionId requestPartitionId = null;
// Handle system requests
if (requestDetails instanceof SystemRequestDetails
&& systemRequestHasExplicitPartition((SystemRequestDetails) requestDetails)
&& !nonPartitionableResource) {
requestPartitionId = getSystemRequestPartitionId((SystemRequestDetails) requestDetails, false);
} else if ((requestDetails instanceof SystemRequestDetails) && nonPartitionableResource) {
requestPartitionId = RequestPartitionId.fromPartitionId(myPartitionSettings.getDefaultPartitionId());
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, requestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY
HookParams params = new HookParams()
.add(RequestDetails.class, requestDetails)
.addIfMatchesType(ServletRequestDetails.class, requestDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params);
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_READ, myInterceptorBroadcaster, requestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_READ
HookParams params = new HookParams()
.add(RequestDetails.class, requestDetails)
.addIfMatchesType(ServletRequestDetails.class, requestDetails)
.add(ReadPartitionIdRequestDetails.class, theDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_READ, params);
}
validateRequestPartitionNotNull(
requestPartitionId, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, Pointcut.STORAGE_PARTITION_IDENTIFY_READ);
validateRequestPartition(requestPartitionId, resourceType);
ourLog.info("Read with partition: {}", requestPartitionId);
return validateAndNormalizePartition(requestPartitionId, requestDetails, resourceType);
}
@Override
public RequestPartitionId determineGenericPartitionForRequest(RequestDetails theRequestDetails) {
RequestPartitionId retVal = null;
RequestPartitionId requestPartitionId = null;
if (myPartitionSettings.isPartitioningEnabled()) {
if (theRequestDetails instanceof SystemRequestDetails) {
SystemRequestDetails systemRequestDetails = (SystemRequestDetails) theRequestDetails;
retVal = systemRequestDetails.getRequestPartitionId();
}
if (!myPartitionSettings.isPartitioningEnabled()) {
return RequestPartitionId.allPartitions();
}
if (retVal == null) {
if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, theRequestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY
HookParams params = new HookParams()
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
retVal = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params);
if (retVal != null) {
retVal = validateNormalizeAndNotifyHooksForRead(retVal, theRequestDetails, null);
}
}
if (theRequestDetails instanceof SystemRequestDetails
&& systemRequestHasExplicitPartition((SystemRequestDetails) theRequestDetails)) {
requestPartitionId = getSystemRequestPartitionId((SystemRequestDetails) theRequestDetails);
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, theRequestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY
HookParams params = new HookParams()
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params);
}
return retVal;
// TODO MM: at the moment it is ok for this method to return null
// check if it can be made consistent and it's implications
// validateRequestPartitionNotNull(requestPartitionId, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY);
if (requestPartitionId != null) {
return validateAndNormalizePartition(
requestPartitionId, theRequestDetails, theRequestDetails.getResourceName());
}
return requestPartitionId;
}
/**
@ -216,56 +225,61 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition
@Nonnull
@Override
public RequestPartitionId determineCreatePartitionForRequest(
@Nullable RequestDetails theRequest, @Nonnull IBaseResource theResource, @Nonnull String theResourceType) {
RequestPartitionId requestPartitionId;
@Nullable final RequestDetails theRequest,
@Nonnull IBaseResource theResource,
@Nonnull String theResourceType) {
if (!myPartitionSettings.isPartitioningEnabled()) {
return RequestPartitionId.allPartitions();
}
boolean nonPartitionableResource = myNonPartitionableResourceNames.contains(theResourceType);
RequestDetails requestDetails = theRequest;
// TODO GGG eventually, theRequest will not be allowed to be null here, and we will pass through
// SystemRequestDetails instead.
if ((theRequest == null || theRequest instanceof SystemRequestDetails) && nonPartitionableResource) {
return RequestPartitionId.defaultPartition();
if (theRequest == null) {
requestDetails = new SystemRequestDetails();
}
boolean nonPartitionableResource = isResourceNonPartitionable(theResourceType);
RequestPartitionId requestPartitionId = null;
if (theRequest instanceof SystemRequestDetails
&& systemRequestHasExplicitPartition((SystemRequestDetails) theRequest)) {
requestPartitionId =
getSystemRequestPartitionId((SystemRequestDetails) theRequest, nonPartitionableResource);
} else {
if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, theRequest)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY
HookParams params = new HookParams()
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params);
} else {
// This is an external Request (e.g. ServletRequestDetails) so we want to figure out the partition
// via interceptor.
// Interceptor call: STORAGE_PARTITION_IDENTIFY_CREATE
HookParams params = new HookParams()
.add(IBaseResource.class, theResource)
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, params);
}
// If the interceptors haven't selected a partition, and its a non-partitionable resource anyhow, send
// to DEFAULT
if (nonPartitionableResource && requestPartitionId == null) {
requestPartitionId = RequestPartitionId.defaultPartition();
}
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, myInterceptorBroadcaster, requestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_ANY
HookParams params = new HookParams()
.add(RequestDetails.class, requestDetails)
.addIfMatchesType(ServletRequestDetails.class, requestDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_ANY, params);
} else if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, myInterceptorBroadcaster, requestDetails)) {
// Interceptor call: STORAGE_PARTITION_IDENTIFY_CREATE
HookParams params = new HookParams()
.add(IBaseResource.class, theResource)
.add(RequestDetails.class, requestDetails)
.addIfMatchesType(ServletRequestDetails.class, requestDetails);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(
myInterceptorBroadcaster, requestDetails, Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE, params);
}
String resourceName = myFhirContext.getResourceType(theResource);
validateSinglePartitionForCreate(requestPartitionId, resourceName, Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE);
// If the interceptors haven't selected a partition, and its a non-partitionable resource anyhow, send
// to DEFAULT
if (nonPartitionableResource && requestPartitionId == null) {
requestPartitionId = RequestPartitionId.defaultPartition();
}
return validateNormalizeAndNotifyHooksForRead(requestPartitionId, theRequest, theResourceType);
validateRequestPartitionNotNull(
requestPartitionId,
Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE,
Pointcut.STORAGE_PARTITION_IDENTIFY_ANY);
validateSinglePartitionForCreate(requestPartitionId);
validateRequestPartition(requestPartitionId, theResourceType);
ourLog.info("Create with partition: {}", requestPartitionId);
return validateAndNormalizePartition(requestPartitionId, requestDetails, theResourceType);
}
private boolean systemRequestHasExplicitPartition(@Nonnull SystemRequestDetails theRequest) {
@ -288,7 +302,7 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition
* If the partition has both, they are validated to ensure that they correspond.
*/
@Nonnull
private RequestPartitionId validateNormalizeAndNotifyHooksForRead(
private RequestPartitionId validateAndNormalizePartition(
@Nonnull RequestPartitionId theRequestPartitionId,
RequestDetails theRequest,
@Nullable String theResourceType) {
@ -330,25 +344,31 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition
@Override
public boolean isResourcePartitionable(String theResourceType) {
return !myNonPartitionableResourceNames.contains(theResourceType);
return theResourceType != null && !myNonPartitionableResourceNames.contains(theResourceType);
}
private void validateSinglePartitionForCreate(
RequestPartitionId theRequestPartitionId, @Nonnull String theResourceName, Pointcut thePointcut) {
validateRequestPartitionNotNull(theRequestPartitionId, thePointcut);
private boolean isResourceNonPartitionable(String theResourceType) {
return theResourceType != null && !isResourcePartitionable(theResourceType);
}
private void validateSinglePartitionForCreate(RequestPartitionId theRequestPartitionId) {
if (theRequestPartitionId.hasPartitionIds()) {
validateSinglePartitionIdOrNameForCreate(theRequestPartitionId.getPartitionIds());
}
validateSinglePartitionIdOrNameForCreate(theRequestPartitionId.getPartitionNames());
}
private void validateRequestPartition(RequestPartitionId theRequestPartitionId, String theResourceName) {
// Make sure we're not using one of the conformance resources in a non-default partition
if (theRequestPartitionId.isDefaultPartition() || theRequestPartitionId.isAllPartitions()) {
return;
}
if ((theRequestPartitionId.hasPartitionIds()
&& !theRequestPartitionId.getPartitionIds().contains(null))
|| (theRequestPartitionId.hasPartitionNames()
&& !theRequestPartitionId.getPartitionNames().contains(JpaConstants.DEFAULT_PARTITION_NAME))) {
if (!isResourcePartitionable(theResourceName)) {
if (isResourceNonPartitionable(theResourceName)) {
String msg = myFhirContext
.getLocalizer()
.getMessageSanitized(
@ -360,10 +380,10 @@ public abstract class BaseRequestPartitionHelperSvc implements IRequestPartition
}
}
private void validateRequestPartitionNotNull(RequestPartitionId theRequestPartitionId, Pointcut theThePointcut) {
private void validateRequestPartitionNotNull(RequestPartitionId theRequestPartitionId, Pointcut... thePointcuts) {
if (theRequestPartitionId == null) {
throw new InternalErrorException(
Msg.code(1319) + "No interceptor provided a value for pointcut: " + theThePointcut);
Msg.code(1319) + "No interceptor provided a value for pointcuts: " + Arrays.toString(thePointcuts));
}
}