This commit is contained in:
leif stawnyczy 2024-09-27 16:08:08 -04:00
parent e5ba5a535e
commit 873f679316
5 changed files with 76 additions and 77 deletions

View File

@ -21,7 +21,6 @@ package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider; import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.FhirVersionEnum;

View File

@ -56,7 +56,6 @@ public class ReindexStepV1 implements IJobStepWorker<ReindexJobParameters, Resou
private final IIdHelperService<IResourcePersistentId<?>> myIdHelperService; private final IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
public ReindexStepV1( public ReindexStepV1(
HapiTransactionService theHapiTransactionService, HapiTransactionService theHapiTransactionService,
IFhirSystemDao<?, ?> theSystemDao, IFhirSystemDao<?, ?> theSystemDao,
@ -89,11 +88,11 @@ public class ReindexStepV1 implements IJobStepWorker<ReindexJobParameters, Resou
} }
public ReindexResults doReindex( public ReindexResults doReindex(
ResourceIdListWorkChunkJson data, ResourceIdListWorkChunkJson data,
IJobDataSink<?> theDataSink, IJobDataSink<?> theDataSink,
String theInstanceId, String theInstanceId,
String theChunkId, String theChunkId,
ReindexJobParameters theJobParameters) { ReindexJobParameters theJobParameters) {
RequestDetails requestDetails = new SystemRequestDetails(); RequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRetry(true); requestDetails.setRetry(true);
requestDetails.setMaxRetries(REINDEX_MAX_RETRIES); requestDetails.setMaxRetries(REINDEX_MAX_RETRIES);
@ -101,19 +100,19 @@ public class ReindexStepV1 implements IJobStepWorker<ReindexJobParameters, Resou
TransactionDetails transactionDetails = new TransactionDetails(); TransactionDetails transactionDetails = new TransactionDetails();
ReindexTask.JobParameters jp = new ReindexTask.JobParameters(); ReindexTask.JobParameters jp = new ReindexTask.JobParameters();
jp.setData(data) jp.setData(data)
.setRequestDetails(requestDetails) .setRequestDetails(requestDetails)
.setTransactionDetails(transactionDetails) .setTransactionDetails(transactionDetails)
.setDataSink(theDataSink) .setDataSink(theDataSink)
.setInstanceId(theInstanceId) .setInstanceId(theInstanceId)
.setChunkId(theChunkId) .setChunkId(theChunkId)
.setJobParameters(theJobParameters); .setJobParameters(theJobParameters);
ReindexTask reindexJob = new ReindexTask(jp, myDaoRegistry, mySystemDao, myIdHelperService); ReindexTask reindexJob = new ReindexTask(jp, myDaoRegistry, mySystemDao, myIdHelperService);
return myHapiTransactionService return myHapiTransactionService
.withRequest(requestDetails) .withRequest(requestDetails)
.withTransactionDetails(transactionDetails) .withTransactionDetails(transactionDetails)
.withRequestPartitionId(data.getRequestPartitionId()) .withRequestPartitionId(data.getRequestPartitionId())
.execute(reindexJob); .execute(reindexJob);
} }
} }

View File

@ -29,6 +29,7 @@ import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
public class ReindexV1Config { public class ReindexV1Config {
@Autowired @Autowired
private ReindexJobService myReindexJobService; private ReindexJobService myReindexJobService;
@Autowired @Autowired
private HapiTransactionService myHapiTransactionService; private HapiTransactionService myHapiTransactionService;
@ -56,24 +57,24 @@ public class ReindexV1Config {
@Bean @Bean
public JobDefinition<ReindexJobParameters> reindexJobDefinitionV1() { public JobDefinition<ReindexJobParameters> reindexJobDefinitionV1() {
return JobDefinition.newBuilder() return JobDefinition.newBuilder()
.setJobDefinitionId(JOB_REINDEX) .setJobDefinitionId(JOB_REINDEX)
.setJobDescription("Reindex resources") .setJobDescription("Reindex resources")
.setJobDefinitionVersion(1) .setJobDefinitionVersion(1)
.setParametersType(ReindexJobParameters.class) .setParametersType(ReindexJobParameters.class)
.setParametersValidator(myReindexJobParametersValidatorV1) .setParametersValidator(myReindexJobParametersValidatorV1)
.gatedExecution() .gatedExecution()
.addFirstStep( .addFirstStep(
"generate-ranges", "generate-ranges",
"Generate data ranges to reindex", "Generate data ranges to reindex",
ChunkRangeJson.class, ChunkRangeJson.class,
myReindexGenerateRangeChunkStep) myReindexGenerateRangeChunkStep)
.addIntermediateStep( .addIntermediateStep(
"load-ids", "load-ids",
"Load IDs of resources to reindex", "Load IDs of resources to reindex",
ResourceIdListWorkChunkJson.class, ResourceIdListWorkChunkJson.class,
myReindexLoadIdsStep) myReindexLoadIdsStep)
.addLastStep("reindex-start", "Start the resource reindex", reindexStepV1()) .addLastStep("reindex-start", "Start the resource reindex", reindexStepV1())
.build(); .build();
} }
@Bean @Bean
@ -88,13 +89,13 @@ public class ReindexV1Config {
@Bean("reindexLoadIdsStepV1") @Bean("reindexLoadIdsStepV1")
public IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> reindexLoadIdsStep( public IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> reindexLoadIdsStep(
IBatch2DaoSvc theBatch2DaoSvc) { IBatch2DaoSvc theBatch2DaoSvc) {
return new LoadIdsStep<>(theBatch2DaoSvc); return new LoadIdsStep<>(theBatch2DaoSvc);
} }
@Bean @Bean
public ReindexJobParametersValidatorV1 reindexJobParametersValidatorV1(IBatch2DaoSvc theBatch2DaoSvc) { public ReindexJobParametersValidatorV1 reindexJobParametersValidatorV1(IBatch2DaoSvc theBatch2DaoSvc) {
return new ReindexJobParametersValidatorV1( return new ReindexJobParametersValidatorV1(
new UrlListValidator(ProviderConstants.OPERATION_REINDEX, theBatch2DaoSvc)); new UrlListValidator(ProviderConstants.OPERATION_REINDEX, theBatch2DaoSvc));
} }
} }

View File

@ -40,7 +40,6 @@ public class ReindexStepV2
private final IIdHelperService<IResourcePersistentId<?>> myIdHelperService; private final IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
public ReindexStepV2( public ReindexStepV2(
ReindexJobService theJobService, ReindexJobService theJobService,
HapiTransactionService theHapiTransactionService, HapiTransactionService theHapiTransactionService,
@ -90,11 +89,11 @@ public class ReindexStepV2
} }
public ReindexResults doReindex( public ReindexResults doReindex(
ResourceIdListWorkChunkJson data, ResourceIdListWorkChunkJson data,
IJobDataSink<?> theDataSink, IJobDataSink<?> theDataSink,
String theInstanceId, String theInstanceId,
String theChunkId, String theChunkId,
ReindexJobParameters theJobParameters) { ReindexJobParameters theJobParameters) {
RequestDetails requestDetails = new SystemRequestDetails(); RequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRetry(true); requestDetails.setRetry(true);
requestDetails.setMaxRetries(REINDEX_MAX_RETRIES); requestDetails.setMaxRetries(REINDEX_MAX_RETRIES);
@ -102,19 +101,19 @@ public class ReindexStepV2
TransactionDetails transactionDetails = new TransactionDetails(); TransactionDetails transactionDetails = new TransactionDetails();
ReindexTask.JobParameters jp = new ReindexTask.JobParameters(); ReindexTask.JobParameters jp = new ReindexTask.JobParameters();
jp.setData(data) jp.setData(data)
.setRequestDetails(requestDetails) .setRequestDetails(requestDetails)
.setTransactionDetails(transactionDetails) .setTransactionDetails(transactionDetails)
.setDataSink(theDataSink) .setDataSink(theDataSink)
.setInstanceId(theInstanceId) .setInstanceId(theInstanceId)
.setChunkId(theChunkId) .setChunkId(theChunkId)
.setJobParameters(theJobParameters); .setJobParameters(theJobParameters);
ReindexTask reindexJob = new ReindexTask(jp, myDaoRegistry, mySystemDao, myIdHelperService); ReindexTask reindexJob = new ReindexTask(jp, myDaoRegistry, mySystemDao, myIdHelperService);
return myHapiTransactionService return myHapiTransactionService
.withRequest(requestDetails) .withRequest(requestDetails)
.withTransactionDetails(transactionDetails) .withTransactionDetails(transactionDetails)
.withRequestPartitionId(data.getRequestPartitionId()) .withRequestPartitionId(data.getRequestPartitionId())
.execute(reindexJob); .execute(reindexJob);
} }
} }

View File

@ -30,6 +30,7 @@ public class ReindexV2Config {
@Autowired @Autowired
private ReindexJobService myReindexJobService; private ReindexJobService myReindexJobService;
@Autowired @Autowired
private HapiTransactionService myHapiTransactionService; private HapiTransactionService myHapiTransactionService;
@ -53,31 +54,30 @@ public class ReindexV2Config {
@Autowired @Autowired
private ReindexJobParametersValidatorV2 myReindexJobParametersValidator; private ReindexJobParametersValidatorV2 myReindexJobParametersValidator;
// Version 2 // Version 2
@Bean @Bean
public JobDefinition<ReindexJobParameters> reindexJobDefinitionV2() { public JobDefinition<ReindexJobParameters> reindexJobDefinitionV2() {
return JobDefinition.newBuilder() return JobDefinition.newBuilder()
.setJobDefinitionId(JOB_REINDEX) .setJobDefinitionId(JOB_REINDEX)
.setJobDescription("Reindex resources") .setJobDescription("Reindex resources")
.setJobDefinitionVersion(2) .setJobDefinitionVersion(2)
.setParametersType(ReindexJobParameters.class) .setParametersType(ReindexJobParameters.class)
.setParametersValidator(myReindexJobParametersValidator) .setParametersValidator(myReindexJobParametersValidator)
.gatedExecution() .gatedExecution()
.addFirstStep( .addFirstStep(
"generate-ranges", "generate-ranges",
"Generate data ranges to reindex", "Generate data ranges to reindex",
ChunkRangeJson.class, ChunkRangeJson.class,
myReindexGenerateRangeChunkStep) myReindexGenerateRangeChunkStep)
.addIntermediateStep( .addIntermediateStep(
"load-ids", "load-ids",
"Load IDs of resources to reindex", "Load IDs of resources to reindex",
ResourceIdListWorkChunkJson.class, ResourceIdListWorkChunkJson.class,
myReindexLoadIdsStep) myReindexLoadIdsStep)
.addIntermediateStep( .addIntermediateStep(
"reindex-start", "Perform the resource reindex", ReindexResults.class, reindexStepV2()) "reindex-start", "Perform the resource reindex", ReindexResults.class, reindexStepV2())
.addLastStep("reindex-pending-work", "Waits for reindex work to complete.", pendingWorkStep()) .addLastStep("reindex-pending-work", "Waits for reindex work to complete.", pendingWorkStep())
.build(); .build();
} }
@Bean @Bean
@ -87,7 +87,8 @@ public class ReindexV2Config {
@Bean @Bean
public ReindexStepV2 reindexStepV2() { public ReindexStepV2 reindexStepV2() {
return new ReindexStepV2(myReindexJobService, myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService); return new ReindexStepV2(
myReindexJobService, myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService);
} }
@Bean("reindexGenerateRangeChunkStepV2") @Bean("reindexGenerateRangeChunkStepV2")
@ -97,13 +98,13 @@ public class ReindexV2Config {
@Bean("reindexLoadIdsStepV2") @Bean("reindexLoadIdsStepV2")
public IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> reindexLoadIdsStep( public IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> reindexLoadIdsStep(
IBatch2DaoSvc theBatch2DaoSvc) { IBatch2DaoSvc theBatch2DaoSvc) {
return new LoadIdsStep<>(theBatch2DaoSvc); return new LoadIdsStep<>(theBatch2DaoSvc);
} }
@Bean @Bean
public ReindexJobParametersValidatorV2 reindexJobParametersValidatorV2(IBatch2DaoSvc theBatch2DaoSvc) { public ReindexJobParametersValidatorV2 reindexJobParametersValidatorV2(IBatch2DaoSvc theBatch2DaoSvc) {
return new ReindexJobParametersValidatorV2( return new ReindexJobParametersValidatorV2(
new UrlListValidator(ProviderConstants.OPERATION_REINDEX, theBatch2DaoSvc)); new UrlListValidator(ProviderConstants.OPERATION_REINDEX, theBatch2DaoSvc));
} }
} }