review points
This commit is contained in:
parent
5e13c2e41e
commit
e5ba5a535e
|
@ -158,6 +158,7 @@ import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
|
||||||
import static java.util.Objects.isNull;
|
import static java.util.Objects.isNull;
|
||||||
import static org.apache.commons.lang3.StringUtils.isBlank;
|
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||||
|
@ -1315,7 +1316,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
||||||
myJobPartitionProvider.getPartitionedUrls(theRequestDetails, urls).forEach(params::addPartitionedUrl);
|
myJobPartitionProvider.getPartitionedUrls(theRequestDetails, urls).forEach(params::addPartitionedUrl);
|
||||||
|
|
||||||
JobInstanceStartRequest request = new JobInstanceStartRequest();
|
JobInstanceStartRequest request = new JobInstanceStartRequest();
|
||||||
request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
request.setJobDefinitionId(JOB_REINDEX);
|
||||||
request.setParameters(params);
|
request.setParameters(params);
|
||||||
myJobCoordinator.startInstance(theRequestDetails, request);
|
myJobCoordinator.startInstance(theRequestDetails, request);
|
||||||
|
|
||||||
|
|
|
@ -54,6 +54,7 @@ import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
|
||||||
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXED;
|
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXED;
|
||||||
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED;
|
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
@ -990,7 +991,7 @@ public class FhirResourceDaoR4ComboUniqueParamTest extends BaseComboParamsR4Test
|
||||||
parameters.addUrl(url);
|
parameters.addUrl(url);
|
||||||
}
|
}
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(parameters);
|
startRequest.setParameters(parameters);
|
||||||
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
|
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
|
||||||
ourLog.info("Started reindex job with id {}", res.getInstanceId());
|
ourLog.info("Started reindex job with id {}", res.getInstanceId());
|
||||||
|
|
|
@ -57,6 +57,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
|
||||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
@ -312,7 +313,7 @@ public class FhirResourceDaoR4IndexStorageOptimizedTest extends BaseJpaR4Test {
|
||||||
parameters.addUrl(url);
|
parameters.addUrl(url);
|
||||||
}
|
}
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(parameters);
|
startRequest.setParameters(parameters);
|
||||||
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
|
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
|
||||||
ourLog.info("Started reindex job with id {}", res.getInstanceId());
|
ourLog.info("Started reindex job with id {}", res.getInstanceId());
|
||||||
|
|
|
@ -7,7 +7,7 @@ import ca.uhn.fhir.batch2.api.VoidModel;
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson;
|
import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson;
|
||||||
import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeStep;
|
import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeStep;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStepV1;
|
import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexStepV1;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||||
import ca.uhn.fhir.batch2.model.WorkChunk;
|
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
package ca.uhn.fhir.jpa.dao.r4;
|
package ca.uhn.fhir.jpa.dao.r4;
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
|
|
||||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||||
import ca.uhn.fhir.context.RuntimeSearchParam;
|
import ca.uhn.fhir.context.RuntimeSearchParam;
|
||||||
import ca.uhn.fhir.context.phonetic.PhoneticEncoderEnum;
|
import ca.uhn.fhir.context.phonetic.PhoneticEncoderEnum;
|
||||||
|
@ -22,7 +21,6 @@ import ca.uhn.fhir.rest.param.NumberParam;
|
||||||
import ca.uhn.fhir.rest.param.ReferenceOrListParam;
|
import ca.uhn.fhir.rest.param.ReferenceOrListParam;
|
||||||
import ca.uhn.fhir.rest.param.ReferenceParam;
|
import ca.uhn.fhir.rest.param.ReferenceParam;
|
||||||
import ca.uhn.fhir.rest.param.StringParam;
|
import ca.uhn.fhir.rest.param.StringParam;
|
||||||
import ca.uhn.fhir.rest.param.TokenOrListParam;
|
|
||||||
import ca.uhn.fhir.rest.param.TokenParam;
|
import ca.uhn.fhir.rest.param.TokenParam;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
||||||
|
@ -70,6 +68,7 @@ import org.springframework.transaction.support.TransactionTemplate;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
|
||||||
import static org.apache.commons.lang3.StringUtils.countMatches;
|
import static org.apache.commons.lang3.StringUtils.countMatches;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
@ -441,11 +440,11 @@ public class FhirResourceDaoR4SearchCustomSearchParamTest extends BaseJpaR4Test
|
||||||
fooSp.setXpathUsage(org.hl7.fhir.r4.model.SearchParameter.XPathUsageType.NORMAL);
|
fooSp.setXpathUsage(org.hl7.fhir.r4.model.SearchParameter.XPathUsageType.NORMAL);
|
||||||
fooSp.setStatus(org.hl7.fhir.r4.model.Enumerations.PublicationStatus.ACTIVE);
|
fooSp.setStatus(org.hl7.fhir.r4.model.Enumerations.PublicationStatus.ACTIVE);
|
||||||
|
|
||||||
List<JobInstance> initialJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX);
|
List<JobInstance> initialJobs = myBatch2JobHelper.findJobsByDefinition(JOB_REINDEX);
|
||||||
|
|
||||||
mySearchParameterDao.create(fooSp, mySrd);
|
mySearchParameterDao.create(fooSp, mySrd);
|
||||||
|
|
||||||
List<JobInstance> finalJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX);
|
List<JobInstance> finalJobs = myBatch2JobHelper.findJobsByDefinition(JOB_REINDEX);
|
||||||
List<JobInstance> newJobs = finalJobs.stream().filter(t -> !initialJobs.contains(t)).collect(Collectors.toList());
|
List<JobInstance> newJobs = finalJobs.stream().filter(t -> !initialJobs.contains(t)).collect(Collectors.toList());
|
||||||
assertThat(newJobs.size()).as("number of jobs created").isEqualTo(1);
|
assertThat(newJobs.size()).as("number of jobs created").isEqualTo(1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ import ca.uhn.fhir.batch2.api.VoidModel;
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
|
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStepV1;
|
import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexStepV1;
|
||||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
|
|
@ -1,8 +1,5 @@
|
||||||
package ca.uhn.fhir.jpa.provider.r4;
|
package ca.uhn.fhir.jpa.provider.r4;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||||
import ca.uhn.fhir.i18n.Msg;
|
import ca.uhn.fhir.i18n.Msg;
|
||||||
|
@ -60,9 +57,11 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.junit.jupiter.api.Assertions.fail;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
import static org.junit.jupiter.api.Assertions.fail;
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
|
|
||||||
|
@ -244,7 +243,7 @@ public class ResourceProviderCustomSearchParamR4Test extends BaseResourceProvide
|
||||||
runInTransaction(() -> {
|
runInTransaction(() -> {
|
||||||
myBatch2JobHelper.forceRunMaintenancePass();
|
myBatch2JobHelper.forceRunMaintenancePass();
|
||||||
|
|
||||||
List<JobInstance> allJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX);
|
List<JobInstance> allJobs = myBatch2JobHelper.findJobsByDefinition(JOB_REINDEX);
|
||||||
assertEquals(1, allJobs.size());
|
assertEquals(1, allJobs.size());
|
||||||
assertEquals(1, allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().size());
|
assertEquals(1, allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().size());
|
||||||
assertEquals("Patient?", allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().get(0).getUrl());
|
assertEquals("Patient?", allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().get(0).getUrl());
|
||||||
|
|
|
@ -6,7 +6,7 @@ import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||||
import ca.uhn.fhir.batch2.api.VoidModel;
|
import ca.uhn.fhir.batch2.api.VoidModel;
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStepV1;
|
import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexStepV1;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||||
import ca.uhn.fhir.batch2.model.WorkChunk;
|
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||||
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
|
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
|
||||||
|
|
|
@ -2,7 +2,6 @@ package ca.uhn.fhir.jpa.reindex;
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
||||||
import ca.uhn.fhir.batch2.api.IJobPersistence;
|
import ca.uhn.fhir.batch2.api.IJobPersistence;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
||||||
|
@ -39,6 +38,7 @@ import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
@ -101,7 +101,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(
|
startRequest.setParameters(
|
||||||
new ReindexJobParameters()
|
new ReindexJobParameters()
|
||||||
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION)
|
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION)
|
||||||
|
@ -158,7 +158,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(
|
startRequest.setParameters(
|
||||||
new ReindexJobParameters()
|
new ReindexJobParameters()
|
||||||
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS)
|
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS)
|
||||||
|
@ -217,7 +217,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(
|
startRequest.setParameters(
|
||||||
new ReindexJobParameters()
|
new ReindexJobParameters()
|
||||||
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS)
|
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS)
|
||||||
|
@ -252,7 +252,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(
|
startRequest.setParameters(
|
||||||
new ReindexJobParameters()
|
new ReindexJobParameters()
|
||||||
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION)
|
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION)
|
||||||
|
@ -294,7 +294,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(parameters);
|
startRequest.setParameters(parameters);
|
||||||
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
|
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
|
||||||
myBatch2JobHelper.awaitJobCompletion(res);
|
myBatch2JobHelper.awaitJobCompletion(res);
|
||||||
|
@ -325,7 +325,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(parameters);
|
startRequest.setParameters(parameters);
|
||||||
Batch2JobStartResponse res = myJobCoordinator.startInstance(startRequest);
|
Batch2JobStartResponse res = myJobCoordinator.startInstance(startRequest);
|
||||||
JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res);
|
JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res);
|
||||||
|
@ -356,7 +356,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
parameters.addUrl("Observation?status=final");
|
parameters.addUrl("Observation?status=final");
|
||||||
|
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(parameters);
|
startRequest.setParameters(parameters);
|
||||||
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
|
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
|
||||||
myBatch2JobHelper.awaitJobCompletion(res);
|
myBatch2JobHelper.awaitJobCompletion(res);
|
||||||
|
@ -387,7 +387,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(new ReindexJobParameters());
|
startRequest.setParameters(new ReindexJobParameters());
|
||||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(mySrd, startRequest);
|
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(mySrd, startRequest);
|
||||||
myBatch2JobHelper.awaitJobCompletion(startResponse);
|
myBatch2JobHelper.awaitJobCompletion(startResponse);
|
||||||
|
@ -406,7 +406,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
DaoMethodOutcome searchParameter = myReindexTestHelper.createUniqueCodeSearchParameter();
|
DaoMethodOutcome searchParameter = myReindexTestHelper.createUniqueCodeSearchParameter();
|
||||||
|
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(new ReindexJobParameters());
|
startRequest.setParameters(new ReindexJobParameters());
|
||||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
|
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
|
||||||
JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse);
|
JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse);
|
||||||
|
@ -436,7 +436,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// Run a reindex
|
// Run a reindex
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(new ReindexJobParameters());
|
startRequest.setParameters(new ReindexJobParameters());
|
||||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
|
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
|
||||||
JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId(), 999);
|
JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId(), 999);
|
||||||
|
@ -469,7 +469,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// Run a reindex
|
// Run a reindex
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(new ReindexJobParameters());
|
startRequest.setParameters(new ReindexJobParameters());
|
||||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
|
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
|
||||||
JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId(), 999);
|
JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId(), 999);
|
||||||
|
@ -500,7 +500,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(new ReindexJobParameters());
|
startRequest.setParameters(new ReindexJobParameters());
|
||||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(mySrd, startRequest);
|
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(mySrd, startRequest);
|
||||||
JobInstance outcome = myBatch2JobHelper.awaitJobCompletion(startResponse);
|
JobInstance outcome = myBatch2JobHelper.awaitJobCompletion(startResponse);
|
||||||
|
@ -528,7 +528,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(new ReindexJobParameters());
|
startRequest.setParameters(new ReindexJobParameters());
|
||||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
|
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
|
||||||
JobInstance outcome = myBatch2JobHelper.awaitJobFailure(startResponse);
|
JobInstance outcome = myBatch2JobHelper.awaitJobFailure(startResponse);
|
||||||
|
@ -541,7 +541,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReindex_withReindexingUponSearchParameterChangeEnabled_reindexJobCompleted() {
|
public void testReindex_withReindexingUponSearchParameterChangeEnabled_reindexJobCompleted() {
|
||||||
List<JobInstance> jobInstances = myJobPersistence.fetchInstancesByJobDefinitionId(ReindexAppCtx.JOB_REINDEX, 10, 0);
|
List<JobInstance> jobInstances = myJobPersistence.fetchInstancesByJobDefinitionId(JOB_REINDEX, 10, 0);
|
||||||
assertThat(jobInstances).isEmpty();
|
assertThat(jobInstances).isEmpty();
|
||||||
|
|
||||||
// make sure the resources auto-reindex after the search parameter update is enabled
|
// make sure the resources auto-reindex after the search parameter update is enabled
|
||||||
|
@ -552,7 +552,7 @@ public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
myReindexTestHelper.createCodeSearchParameter();
|
myReindexTestHelper.createCodeSearchParameter();
|
||||||
|
|
||||||
// check that reindex job was created
|
// check that reindex job was created
|
||||||
jobInstances = myJobPersistence.fetchInstancesByJobDefinitionId(ReindexAppCtx.JOB_REINDEX, 10, 0);
|
jobInstances = myJobPersistence.fetchInstancesByJobDefinitionId(JOB_REINDEX, 10, 0);
|
||||||
assertThat(jobInstances).hasSize(1);
|
assertThat(jobInstances).hasSize(1);
|
||||||
|
|
||||||
// check that the job is completed (not stuck in QUEUED status)
|
// check that the job is completed (not stuck in QUEUED status)
|
||||||
|
|
|
@ -3,7 +3,6 @@ package ca.uhn.fhir.jpa.reindex;
|
||||||
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
||||||
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
|
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
|
||||||
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
|
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
|
|
||||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
||||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||||
|
@ -25,6 +24,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||||
public class ReindexTaskWithPartitioningTest extends BaseJpaR4Test {
|
public class ReindexTaskWithPartitioningTest extends BaseJpaR4Test {
|
||||||
|
@ -133,7 +133,7 @@ public class ReindexTaskWithPartitioningTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(parameters);
|
startRequest.setParameters(parameters);
|
||||||
Batch2JobStartResponse res = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
|
Batch2JobStartResponse res = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
|
||||||
JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res);
|
JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res);
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
package ca.uhn.fhir.jpa.term;
|
package ca.uhn.fhir.jpa.term;
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
||||||
|
@ -68,6 +67,7 @@ import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
|
||||||
import static ca.uhn.fhir.util.HapiExtensions.EXT_VALUESET_EXPANSION_MESSAGE;
|
import static ca.uhn.fhir.util.HapiExtensions.EXT_VALUESET_EXPANSION_MESSAGE;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.awaitility.Awaitility.await;
|
import static org.awaitility.Awaitility.await;
|
||||||
|
@ -2229,7 +2229,7 @@ public class ValueSetExpansionR4Test extends BaseTermR4Test implements IValueSet
|
||||||
ReindexJobParameters params = new ReindexJobParameters();
|
ReindexJobParameters params = new ReindexJobParameters();
|
||||||
params.addUrl("CodeSystem?");
|
params.addUrl("CodeSystem?");
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
startRequest.setParameters(params);
|
startRequest.setParameters(params);
|
||||||
|
|
||||||
// and wait for it to complete
|
// and wait for it to complete
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.junit.jupiter.params.provider.Arguments;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.fail;
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
@ -170,7 +171,7 @@ public class PatientReindexTestHelper {
|
||||||
|
|
||||||
private JobInstanceStartRequest createPatientReindexRequest(int theBatchSize) {
|
private JobInstanceStartRequest createPatientReindexRequest(int theBatchSize) {
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
startRequest.setJobDefinitionId(JOB_REINDEX);
|
||||||
|
|
||||||
ReindexJobParameters reindexJobParameters = new ReindexJobParameters();
|
ReindexJobParameters reindexJobParameters = new ReindexJobParameters();
|
||||||
reindexJobParameters.setBatchSize(Math.max(theBatchSize,1));
|
reindexJobParameters.setBatchSize(Math.max(theBatchSize,1));
|
||||||
|
|
|
@ -17,11 +17,15 @@ import org.springframework.test.util.ReflectionTestUtils;
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_DELETE_JOB_NAME;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyInt;
|
||||||
import static org.mockito.ArgumentMatchers.anyLong;
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.ArgumentMatchers.same;
|
import static org.mockito.ArgumentMatchers.same;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
|
@ -67,8 +71,15 @@ public class TermDeferredStorageSvcImplTest {
|
||||||
|
|
||||||
ReflectionTestUtils.setField(mySvc, "myJobExecutions", mockExecutions);
|
ReflectionTestUtils.setField(mySvc, "myJobExecutions", mockExecutions);
|
||||||
|
|
||||||
when(myJobCoordinator.getInstance(eq(jobId)))
|
when(myJobCoordinator.getInstancesbyJobDefinitionIdAndEndedStatus(
|
||||||
.thenReturn(instance);
|
eq(TERM_CODE_SYSTEM_DELETE_JOB_NAME),
|
||||||
|
eq(true),
|
||||||
|
anyInt(),
|
||||||
|
eq(0)
|
||||||
|
))
|
||||||
|
.thenReturn(List.of()) // first nothing
|
||||||
|
.thenReturn(List.of(instance)); // then the list with the instance
|
||||||
|
|
||||||
assertFalse(mySvc.isStorageQueueEmpty(true));
|
assertFalse(mySvc.isStorageQueueEmpty(true));
|
||||||
instance.setStatus(StatusEnum.COMPLETED);
|
instance.setStatus(StatusEnum.COMPLETED);
|
||||||
assertTrue(mySvc.isStorageQueueEmpty(true));
|
assertTrue(mySvc.isStorageQueueEmpty(true));
|
||||||
|
|
|
@ -1,66 +0,0 @@
|
||||||
package ca.uhn.fhir.batch2.jobs.reindex;
|
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
|
|
||||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
|
||||||
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
|
||||||
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
|
||||||
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
|
||||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
|
||||||
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
|
||||||
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
|
||||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
|
||||||
|
|
||||||
public abstract class BaseReindexStep {
|
|
||||||
|
|
||||||
public static final int REINDEX_MAX_RETRIES = 10;
|
|
||||||
|
|
||||||
protected final HapiTransactionService myHapiTransactionService;
|
|
||||||
|
|
||||||
protected final IFhirSystemDao<?, ?> mySystemDao;
|
|
||||||
|
|
||||||
protected final DaoRegistry myDaoRegistry;
|
|
||||||
|
|
||||||
protected final IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
|
|
||||||
|
|
||||||
public BaseReindexStep(
|
|
||||||
HapiTransactionService theHapiTransactionService,
|
|
||||||
IFhirSystemDao<?, ?> theSystemDao,
|
|
||||||
DaoRegistry theRegistry,
|
|
||||||
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
|
|
||||||
myHapiTransactionService = theHapiTransactionService;
|
|
||||||
mySystemDao = theSystemDao;
|
|
||||||
myDaoRegistry = theRegistry;
|
|
||||||
myIdHelperService = theIdHelperService;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ReindexResults doReindex(
|
|
||||||
ResourceIdListWorkChunkJson data,
|
|
||||||
IJobDataSink<?> theDataSink,
|
|
||||||
String theInstanceId,
|
|
||||||
String theChunkId,
|
|
||||||
ReindexJobParameters theJobParameters) {
|
|
||||||
RequestDetails requestDetails = new SystemRequestDetails();
|
|
||||||
requestDetails.setRetry(true);
|
|
||||||
requestDetails.setMaxRetries(REINDEX_MAX_RETRIES);
|
|
||||||
|
|
||||||
TransactionDetails transactionDetails = new TransactionDetails();
|
|
||||||
ReindexTask.JobParameters jp = new ReindexTask.JobParameters();
|
|
||||||
jp.setData(data)
|
|
||||||
.setRequestDetails(requestDetails)
|
|
||||||
.setTransactionDetails(transactionDetails)
|
|
||||||
.setDataSink(theDataSink)
|
|
||||||
.setInstanceId(theInstanceId)
|
|
||||||
.setChunkId(theChunkId)
|
|
||||||
.setJobParameters(theJobParameters);
|
|
||||||
|
|
||||||
ReindexTask reindexJob = new ReindexTask(jp, myDaoRegistry, mySystemDao, myIdHelperService);
|
|
||||||
|
|
||||||
return myHapiTransactionService
|
|
||||||
.withRequest(requestDetails)
|
|
||||||
.withTransactionDetails(transactionDetails)
|
|
||||||
.withRequestPartitionId(data.getRequestPartitionId())
|
|
||||||
.execute(reindexJob);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -21,33 +21,24 @@ package ca.uhn.fhir.batch2.jobs.reindex;
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
||||||
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
|
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
|
||||||
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
|
||||||
import ca.uhn.fhir.batch2.api.VoidModel;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
|
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
|
||||||
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
|
import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexV1Config;
|
||||||
import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
|
import ca.uhn.fhir.batch2.jobs.reindex.v2.ReindexV2Config;
|
||||||
import ca.uhn.fhir.batch2.model.JobDefinition;
|
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
||||||
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
|
|
||||||
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||||
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||||
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||||
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.context.annotation.Import;
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
|
@Import({ReindexV1Config.class, ReindexV2Config.class})
|
||||||
public class ReindexAppCtx {
|
public class ReindexAppCtx {
|
||||||
|
|
||||||
public static final String JOB_REINDEX = "REINDEX";
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private HapiTransactionService myHapiTransactionService;
|
private HapiTransactionService myHapiTransactionService;
|
||||||
|
|
||||||
|
@ -60,87 +51,7 @@ public class ReindexAppCtx {
|
||||||
@Autowired
|
@Autowired
|
||||||
private IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
|
private IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
|
||||||
|
|
||||||
// Version 2
|
/* Shared services */
|
||||||
@Bean
|
|
||||||
public JobDefinition<ReindexJobParameters> reindexJobDefinitionV2(IBatch2DaoSvc theBatch2DaoSvc) {
|
|
||||||
return JobDefinition.newBuilder()
|
|
||||||
.setJobDefinitionId(JOB_REINDEX)
|
|
||||||
.setJobDescription("Reindex resources")
|
|
||||||
.setJobDefinitionVersion(2)
|
|
||||||
.setParametersType(ReindexJobParameters.class)
|
|
||||||
.setParametersValidator(reindexJobParametersValidator(theBatch2DaoSvc))
|
|
||||||
.gatedExecution()
|
|
||||||
.addFirstStep(
|
|
||||||
"generate-ranges",
|
|
||||||
"Generate data ranges to reindex",
|
|
||||||
ChunkRangeJson.class,
|
|
||||||
reindexGenerateRangeChunksStep())
|
|
||||||
.addIntermediateStep(
|
|
||||||
"load-ids",
|
|
||||||
"Load IDs of resources to reindex",
|
|
||||||
ResourceIdListWorkChunkJson.class,
|
|
||||||
reindexLoadIdsStep(theBatch2DaoSvc))
|
|
||||||
.addIntermediateStep(
|
|
||||||
"reindex-start", "Perform the resource reindex", ReindexResults.class, reindexStepV2())
|
|
||||||
.addLastStep("reindex-pending-work", "Waits for reindex work to complete.", pendingWorkStep())
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Version 1
|
|
||||||
@Bean
|
|
||||||
public JobDefinition<ReindexJobParameters> reindexJobDefinitionV1(IBatch2DaoSvc theBatch2DaoSvc) {
|
|
||||||
return JobDefinition.newBuilder()
|
|
||||||
.setJobDefinitionId(JOB_REINDEX)
|
|
||||||
.setJobDescription("Reindex resources")
|
|
||||||
.setJobDefinitionVersion(1)
|
|
||||||
.setParametersType(ReindexJobParameters.class)
|
|
||||||
.setParametersValidator(reindexJobParametersValidator(theBatch2DaoSvc))
|
|
||||||
.gatedExecution()
|
|
||||||
.addFirstStep(
|
|
||||||
"generate-ranges",
|
|
||||||
"Generate data ranges to reindex",
|
|
||||||
ChunkRangeJson.class,
|
|
||||||
reindexGenerateRangeChunksStep())
|
|
||||||
.addIntermediateStep(
|
|
||||||
"load-ids",
|
|
||||||
"Load IDs of resources to reindex",
|
|
||||||
ResourceIdListWorkChunkJson.class,
|
|
||||||
reindexLoadIdsStep(theBatch2DaoSvc))
|
|
||||||
.addLastStep("reindex-start", "Start the resource reindex", reindexStepV1())
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public IJobStepWorker<ReindexJobParameters, VoidModel, ChunkRangeJson> reindexGenerateRangeChunksStep() {
|
|
||||||
return new GenerateRangeChunksStep<>();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> reindexLoadIdsStep(
|
|
||||||
IBatch2DaoSvc theBatch2DaoSvc) {
|
|
||||||
return new LoadIdsStep<>(theBatch2DaoSvc);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ReindexJobParametersValidator reindexJobParametersValidator(IBatch2DaoSvc theBatch2DaoSvc) {
|
|
||||||
return new ReindexJobParametersValidator(
|
|
||||||
new UrlListValidator(ProviderConstants.OPERATION_REINDEX, theBatch2DaoSvc));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ReindexStepV1 reindexStepV1() {
|
|
||||||
return new ReindexStepV1(myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ReindexStepV2 reindexStepV2() {
|
|
||||||
return new ReindexStepV2(jobService(), myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public CheckPendingReindexWorkStep pendingWorkStep() {
|
|
||||||
return new CheckPendingReindexWorkStep(jobService());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public ReindexProvider reindexProvider(
|
public ReindexProvider reindexProvider(
|
||||||
|
|
|
@ -44,6 +44,7 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.OPTIMIZE_STORAGE;
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.OPTIMIZE_STORAGE;
|
||||||
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.REINDEX_SEARCH_PARAMETERS;
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.REINDEX_SEARCH_PARAMETERS;
|
||||||
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
|
||||||
|
|
||||||
public class ReindexProvider {
|
public class ReindexProvider {
|
||||||
|
|
||||||
|
@ -127,7 +128,7 @@ public class ReindexProvider {
|
||||||
myJobPartitionProvider.getPartitionedUrls(theRequestDetails, urls).forEach(params::addPartitionedUrl);
|
myJobPartitionProvider.getPartitionedUrls(theRequestDetails, urls).forEach(params::addPartitionedUrl);
|
||||||
|
|
||||||
JobInstanceStartRequest request = new JobInstanceStartRequest();
|
JobInstanceStartRequest request = new JobInstanceStartRequest();
|
||||||
request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
request.setJobDefinitionId(JOB_REINDEX);
|
||||||
request.setParameters(params);
|
request.setParameters(params);
|
||||||
Batch2JobStartResponse response = myJobCoordinator.startInstance(theRequestDetails, request);
|
Batch2JobStartResponse response = myJobCoordinator.startInstance(theRequestDetails, request);
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,13 @@ import java.time.temporal.ChronoUnit;
|
||||||
|
|
||||||
public class ReindexUtils {
|
public class ReindexUtils {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The reindex job definition id
|
||||||
|
*/
|
||||||
|
public static final String JOB_REINDEX = "REINDEX";
|
||||||
|
|
||||||
|
public static final int REINDEX_MAX_RETRIES = 10;
|
||||||
|
|
||||||
private static final Duration RETRY_DELAY = Duration.of(30, ChronoUnit.SECONDS);
|
private static final Duration RETRY_DELAY = Duration.of(30, ChronoUnit.SECONDS);
|
||||||
|
|
||||||
private static Duration myDelay;
|
private static Duration myDelay;
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
/*-
|
||||||
|
* #%L
|
||||||
|
* hapi-fhir-storage-batch2-jobs
|
||||||
|
* %%
|
||||||
|
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
|
||||||
|
* %%
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
* #L%
|
||||||
|
*/
|
||||||
|
package ca.uhn.fhir.batch2.jobs.reindex.v1;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.parameters.IUrlListValidator;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
import jakarta.annotation.Nonnull;
|
||||||
|
import jakarta.annotation.Nullable;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Deprecated(forRemoval = true, since = "7.6.0")
|
||||||
|
public class ReindexJobParametersValidatorV1 implements IJobParametersValidator<ReindexJobParameters> {
|
||||||
|
|
||||||
|
private final IUrlListValidator myUrlListValidator;
|
||||||
|
|
||||||
|
public ReindexJobParametersValidatorV1(IUrlListValidator theUrlListValidator) {
|
||||||
|
myUrlListValidator = theUrlListValidator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
@Override
|
||||||
|
public List<String> validate(RequestDetails theRequestDetails, @Nonnull ReindexJobParameters theParameters) {
|
||||||
|
List<String> errors = myUrlListValidator.validateUrls(theParameters.getUrls());
|
||||||
|
|
||||||
|
if (errors == null || errors.isEmpty()) {
|
||||||
|
// only check if there's no other errors (new list to fix immutable issues)
|
||||||
|
errors = new ArrayList<>();
|
||||||
|
for (String url : theParameters.getUrls()) {
|
||||||
|
if (url.contains(" ") || url.contains("\n") || url.contains("\t")) {
|
||||||
|
errors.add("Invalid URL. URL cannot contain spaces : " + url);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errors;
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,7 +17,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
* #L%
|
* #L%
|
||||||
*/
|
*/
|
||||||
package ca.uhn.fhir.batch2.jobs.reindex;
|
package ca.uhn.fhir.batch2.jobs.reindex.v1;
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||||
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
||||||
|
@ -26,26 +26,46 @@ import ca.uhn.fhir.batch2.api.RunOutcome;
|
||||||
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||||
import ca.uhn.fhir.batch2.api.VoidModel;
|
import ca.uhn.fhir.batch2.api.VoidModel;
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexTask;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
|
||||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
||||||
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||||
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||||
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||||
import jakarta.annotation.Nonnull;
|
import jakarta.annotation.Nonnull;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class ReindexStepV1 extends BaseReindexStep
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.REINDEX_MAX_RETRIES;
|
||||||
implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
|
|
||||||
|
@Deprecated(forRemoval = true, since = "7.6.0")
|
||||||
|
public class ReindexStepV1 implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
|
||||||
|
|
||||||
private static final Logger ourLog = LoggerFactory.getLogger(ReindexStepV1.class);
|
private static final Logger ourLog = LoggerFactory.getLogger(ReindexStepV1.class);
|
||||||
|
|
||||||
|
private final HapiTransactionService myHapiTransactionService;
|
||||||
|
|
||||||
|
private final IFhirSystemDao<?, ?> mySystemDao;
|
||||||
|
|
||||||
|
private final DaoRegistry myDaoRegistry;
|
||||||
|
|
||||||
|
private final IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
|
||||||
|
|
||||||
|
|
||||||
public ReindexStepV1(
|
public ReindexStepV1(
|
||||||
HapiTransactionService theHapiTransactionService,
|
HapiTransactionService theHapiTransactionService,
|
||||||
IFhirSystemDao<?, ?> theSystemDao,
|
IFhirSystemDao<?, ?> theSystemDao,
|
||||||
DaoRegistry theRegistry,
|
DaoRegistry theRegistry,
|
||||||
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
|
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
|
||||||
super(theHapiTransactionService, theSystemDao, theRegistry, theIdHelperService);
|
myDaoRegistry = theRegistry;
|
||||||
|
myHapiTransactionService = theHapiTransactionService;
|
||||||
|
mySystemDao = theSystemDao;
|
||||||
|
myIdHelperService = theIdHelperService;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
|
@ -67,4 +87,33 @@ public class ReindexStepV1 extends BaseReindexStep
|
||||||
|
|
||||||
return new RunOutcome(data.size());
|
return new RunOutcome(data.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ReindexResults doReindex(
|
||||||
|
ResourceIdListWorkChunkJson data,
|
||||||
|
IJobDataSink<?> theDataSink,
|
||||||
|
String theInstanceId,
|
||||||
|
String theChunkId,
|
||||||
|
ReindexJobParameters theJobParameters) {
|
||||||
|
RequestDetails requestDetails = new SystemRequestDetails();
|
||||||
|
requestDetails.setRetry(true);
|
||||||
|
requestDetails.setMaxRetries(REINDEX_MAX_RETRIES);
|
||||||
|
|
||||||
|
TransactionDetails transactionDetails = new TransactionDetails();
|
||||||
|
ReindexTask.JobParameters jp = new ReindexTask.JobParameters();
|
||||||
|
jp.setData(data)
|
||||||
|
.setRequestDetails(requestDetails)
|
||||||
|
.setTransactionDetails(transactionDetails)
|
||||||
|
.setDataSink(theDataSink)
|
||||||
|
.setInstanceId(theInstanceId)
|
||||||
|
.setChunkId(theChunkId)
|
||||||
|
.setJobParameters(theJobParameters);
|
||||||
|
|
||||||
|
ReindexTask reindexJob = new ReindexTask(jp, myDaoRegistry, mySystemDao, myIdHelperService);
|
||||||
|
|
||||||
|
return myHapiTransactionService
|
||||||
|
.withRequest(requestDetails)
|
||||||
|
.withTransactionDetails(transactionDetails)
|
||||||
|
.withRequestPartitionId(data.getRequestPartitionId())
|
||||||
|
.execute(reindexJob);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,100 @@
|
||||||
|
package ca.uhn.fhir.batch2.jobs.reindex.v1;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
||||||
|
import ca.uhn.fhir.batch2.api.VoidModel;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
|
||||||
|
import ca.uhn.fhir.batch2.model.JobDefinition;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
||||||
|
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
|
||||||
|
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||||
|
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||||
|
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
|
||||||
|
|
||||||
|
@Deprecated(forRemoval = true, since = "7.6.0")
|
||||||
|
@Configuration
|
||||||
|
public class ReindexV1Config {
|
||||||
|
@Autowired
|
||||||
|
private ReindexJobService myReindexJobService;
|
||||||
|
@Autowired
|
||||||
|
private HapiTransactionService myHapiTransactionService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IFhirSystemDao<?, ?> mySystemDao;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private DaoRegistry myRegistry;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("reindexGenerateRangeChunkStepV1")
|
||||||
|
private IJobStepWorker<ReindexJobParameters, VoidModel, ChunkRangeJson> myReindexGenerateRangeChunkStep;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("reindexLoadIdsStepV1")
|
||||||
|
private IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> myReindexLoadIdsStep;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ReindexJobParametersValidatorV1 myReindexJobParametersValidatorV1;
|
||||||
|
|
||||||
|
// Version 1
|
||||||
|
@Bean
|
||||||
|
public JobDefinition<ReindexJobParameters> reindexJobDefinitionV1() {
|
||||||
|
return JobDefinition.newBuilder()
|
||||||
|
.setJobDefinitionId(JOB_REINDEX)
|
||||||
|
.setJobDescription("Reindex resources")
|
||||||
|
.setJobDefinitionVersion(1)
|
||||||
|
.setParametersType(ReindexJobParameters.class)
|
||||||
|
.setParametersValidator(myReindexJobParametersValidatorV1)
|
||||||
|
.gatedExecution()
|
||||||
|
.addFirstStep(
|
||||||
|
"generate-ranges",
|
||||||
|
"Generate data ranges to reindex",
|
||||||
|
ChunkRangeJson.class,
|
||||||
|
myReindexGenerateRangeChunkStep)
|
||||||
|
.addIntermediateStep(
|
||||||
|
"load-ids",
|
||||||
|
"Load IDs of resources to reindex",
|
||||||
|
ResourceIdListWorkChunkJson.class,
|
||||||
|
myReindexLoadIdsStep)
|
||||||
|
.addLastStep("reindex-start", "Start the resource reindex", reindexStepV1())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ReindexStepV1 reindexStepV1() {
|
||||||
|
return new ReindexStepV1(myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean("reindexGenerateRangeChunkStepV1")
|
||||||
|
public IJobStepWorker<ReindexJobParameters, VoidModel, ChunkRangeJson> reindexGenerateRangeChunksStep() {
|
||||||
|
return new GenerateRangeChunksStep<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean("reindexLoadIdsStepV1")
|
||||||
|
public IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> reindexLoadIdsStep(
|
||||||
|
IBatch2DaoSvc theBatch2DaoSvc) {
|
||||||
|
return new LoadIdsStep<>(theBatch2DaoSvc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ReindexJobParametersValidatorV1 reindexJobParametersValidatorV1(IBatch2DaoSvc theBatch2DaoSvc) {
|
||||||
|
return new ReindexJobParametersValidatorV1(
|
||||||
|
new UrlListValidator(ProviderConstants.OPERATION_REINDEX, theBatch2DaoSvc));
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package ca.uhn.fhir.batch2.jobs.reindex;
|
package ca.uhn.fhir.batch2.jobs.reindex.v2;
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||||
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
||||||
|
@ -7,6 +7,8 @@ import ca.uhn.fhir.batch2.api.RetryChunkLaterException;
|
||||||
import ca.uhn.fhir.batch2.api.RunOutcome;
|
import ca.uhn.fhir.batch2.api.RunOutcome;
|
||||||
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||||
import ca.uhn.fhir.batch2.api.VoidModel;
|
import ca.uhn.fhir.batch2.api.VoidModel;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
|
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
|
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
|
||||||
import ca.uhn.fhir.i18n.Msg;
|
import ca.uhn.fhir.i18n.Msg;
|
|
@ -17,10 +17,11 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
* #L%
|
* #L%
|
||||||
*/
|
*/
|
||||||
package ca.uhn.fhir.batch2.jobs.reindex;
|
package ca.uhn.fhir.batch2.jobs.reindex.v2;
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
|
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
|
||||||
import ca.uhn.fhir.batch2.jobs.parameters.IUrlListValidator;
|
import ca.uhn.fhir.batch2.jobs.parameters.IUrlListValidator;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
import jakarta.annotation.Nonnull;
|
import jakarta.annotation.Nonnull;
|
||||||
import jakarta.annotation.Nullable;
|
import jakarta.annotation.Nullable;
|
||||||
|
@ -28,11 +29,11 @@ import jakarta.annotation.Nullable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class ReindexJobParametersValidator implements IJobParametersValidator<ReindexJobParameters> {
|
public class ReindexJobParametersValidatorV2 implements IJobParametersValidator<ReindexJobParameters> {
|
||||||
|
|
||||||
private final IUrlListValidator myUrlListValidator;
|
private final IUrlListValidator myUrlListValidator;
|
||||||
|
|
||||||
public ReindexJobParametersValidator(IUrlListValidator theUrlListValidator) {
|
public ReindexJobParametersValidatorV2(IUrlListValidator theUrlListValidator) {
|
||||||
myUrlListValidator = theUrlListValidator;
|
myUrlListValidator = theUrlListValidator;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package ca.uhn.fhir.batch2.jobs.reindex;
|
package ca.uhn.fhir.batch2.jobs.reindex.v2;
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||||
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
||||||
|
@ -7,6 +7,9 @@ import ca.uhn.fhir.batch2.api.RetryChunkLaterException;
|
||||||
import ca.uhn.fhir.batch2.api.RunOutcome;
|
import ca.uhn.fhir.batch2.api.RunOutcome;
|
||||||
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexTask;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
|
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
|
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
|
||||||
import ca.uhn.fhir.i18n.Msg;
|
import ca.uhn.fhir.i18n.Msg;
|
||||||
|
@ -14,16 +17,29 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
||||||
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||||
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||||
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||||
import jakarta.annotation.Nonnull;
|
import jakarta.annotation.Nonnull;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class ReindexStepV2 extends BaseReindexStep
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.REINDEX_MAX_RETRIES;
|
||||||
|
|
||||||
|
public class ReindexStepV2
|
||||||
implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, ReindexResults> {
|
implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, ReindexResults> {
|
||||||
|
|
||||||
private final ReindexJobService myReindexJobService;
|
private final ReindexJobService myReindexJobService;
|
||||||
|
private final HapiTransactionService myHapiTransactionService;
|
||||||
|
|
||||||
|
private final IFhirSystemDao<?, ?> mySystemDao;
|
||||||
|
|
||||||
|
private final DaoRegistry myDaoRegistry;
|
||||||
|
|
||||||
|
private final IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
|
||||||
|
|
||||||
|
|
||||||
public ReindexStepV2(
|
public ReindexStepV2(
|
||||||
ReindexJobService theJobService,
|
ReindexJobService theJobService,
|
||||||
|
@ -31,7 +47,10 @@ public class ReindexStepV2 extends BaseReindexStep
|
||||||
IFhirSystemDao<?, ?> theSystemDao,
|
IFhirSystemDao<?, ?> theSystemDao,
|
||||||
DaoRegistry theRegistry,
|
DaoRegistry theRegistry,
|
||||||
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
|
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
|
||||||
super(theHapiTransactionService, theSystemDao, theRegistry, theIdHelperService);
|
myDaoRegistry = theRegistry;
|
||||||
|
myHapiTransactionService = theHapiTransactionService;
|
||||||
|
mySystemDao = theSystemDao;
|
||||||
|
myIdHelperService = theIdHelperService;
|
||||||
myReindexJobService = theJobService;
|
myReindexJobService = theJobService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,4 +88,33 @@ public class ReindexStepV2 extends BaseReindexStep
|
||||||
|
|
||||||
return new RunOutcome(data.size());
|
return new RunOutcome(data.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ReindexResults doReindex(
|
||||||
|
ResourceIdListWorkChunkJson data,
|
||||||
|
IJobDataSink<?> theDataSink,
|
||||||
|
String theInstanceId,
|
||||||
|
String theChunkId,
|
||||||
|
ReindexJobParameters theJobParameters) {
|
||||||
|
RequestDetails requestDetails = new SystemRequestDetails();
|
||||||
|
requestDetails.setRetry(true);
|
||||||
|
requestDetails.setMaxRetries(REINDEX_MAX_RETRIES);
|
||||||
|
|
||||||
|
TransactionDetails transactionDetails = new TransactionDetails();
|
||||||
|
ReindexTask.JobParameters jp = new ReindexTask.JobParameters();
|
||||||
|
jp.setData(data)
|
||||||
|
.setRequestDetails(requestDetails)
|
||||||
|
.setTransactionDetails(transactionDetails)
|
||||||
|
.setDataSink(theDataSink)
|
||||||
|
.setInstanceId(theInstanceId)
|
||||||
|
.setChunkId(theChunkId)
|
||||||
|
.setJobParameters(theJobParameters);
|
||||||
|
|
||||||
|
ReindexTask reindexJob = new ReindexTask(jp, myDaoRegistry, mySystemDao, myIdHelperService);
|
||||||
|
|
||||||
|
return myHapiTransactionService
|
||||||
|
.withRequest(requestDetails)
|
||||||
|
.withTransactionDetails(transactionDetails)
|
||||||
|
.withRequestPartitionId(data.getRequestPartitionId())
|
||||||
|
.execute(reindexJob);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,109 @@
|
||||||
|
package ca.uhn.fhir.batch2.jobs.reindex.v2;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
||||||
|
import ca.uhn.fhir.batch2.api.VoidModel;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
|
||||||
|
import ca.uhn.fhir.batch2.model.JobDefinition;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
||||||
|
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
|
||||||
|
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||||
|
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||||
|
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class ReindexV2Config {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ReindexJobService myReindexJobService;
|
||||||
|
@Autowired
|
||||||
|
private HapiTransactionService myHapiTransactionService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IFhirSystemDao<?, ?> mySystemDao;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private DaoRegistry myRegistry;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("reindexGenerateRangeChunkStepV2")
|
||||||
|
private IJobStepWorker<ReindexJobParameters, VoidModel, ChunkRangeJson> myReindexGenerateRangeChunkStep;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("reindexLoadIdsStepV2")
|
||||||
|
private IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> myReindexLoadIdsStep;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ReindexJobParametersValidatorV2 myReindexJobParametersValidator;
|
||||||
|
|
||||||
|
|
||||||
|
// Version 2
|
||||||
|
@Bean
|
||||||
|
public JobDefinition<ReindexJobParameters> reindexJobDefinitionV2() {
|
||||||
|
return JobDefinition.newBuilder()
|
||||||
|
.setJobDefinitionId(JOB_REINDEX)
|
||||||
|
.setJobDescription("Reindex resources")
|
||||||
|
.setJobDefinitionVersion(2)
|
||||||
|
.setParametersType(ReindexJobParameters.class)
|
||||||
|
.setParametersValidator(myReindexJobParametersValidator)
|
||||||
|
.gatedExecution()
|
||||||
|
.addFirstStep(
|
||||||
|
"generate-ranges",
|
||||||
|
"Generate data ranges to reindex",
|
||||||
|
ChunkRangeJson.class,
|
||||||
|
myReindexGenerateRangeChunkStep)
|
||||||
|
.addIntermediateStep(
|
||||||
|
"load-ids",
|
||||||
|
"Load IDs of resources to reindex",
|
||||||
|
ResourceIdListWorkChunkJson.class,
|
||||||
|
myReindexLoadIdsStep)
|
||||||
|
.addIntermediateStep(
|
||||||
|
"reindex-start", "Perform the resource reindex", ReindexResults.class, reindexStepV2())
|
||||||
|
.addLastStep("reindex-pending-work", "Waits for reindex work to complete.", pendingWorkStep())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public CheckPendingReindexWorkStep pendingWorkStep() {
|
||||||
|
return new CheckPendingReindexWorkStep(myReindexJobService);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ReindexStepV2 reindexStepV2() {
|
||||||
|
return new ReindexStepV2(myReindexJobService, myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean("reindexGenerateRangeChunkStepV2")
|
||||||
|
public IJobStepWorker<ReindexJobParameters, VoidModel, ChunkRangeJson> reindexGenerateRangeChunksStep() {
|
||||||
|
return new GenerateRangeChunksStep<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean("reindexLoadIdsStepV2")
|
||||||
|
public IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> reindexLoadIdsStep(
|
||||||
|
IBatch2DaoSvc theBatch2DaoSvc) {
|
||||||
|
return new LoadIdsStep<>(theBatch2DaoSvc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ReindexJobParametersValidatorV2 reindexJobParametersValidatorV2(IBatch2DaoSvc theBatch2DaoSvc) {
|
||||||
|
return new ReindexJobParametersValidatorV2(
|
||||||
|
new UrlListValidator(ProviderConstants.OPERATION_REINDEX, theBatch2DaoSvc));
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package ca.uhn.fhir.batch2.jobs.reindex;
|
package ca.uhn.fhir.batch2.jobs.reindex;
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
|
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexJobParametersValidatorV1;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.ValueSource;
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
|
@ -19,7 +20,7 @@ public class ReindexTaskParametersValidatorTest {
|
||||||
private UrlListValidator myListValidator;
|
private UrlListValidator myListValidator;
|
||||||
|
|
||||||
@InjectMocks
|
@InjectMocks
|
||||||
private ReindexJobParametersValidator myValidator;
|
private ReindexJobParametersValidatorV1 myValidator;
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ValueSource(strings = { "\n", " ", "\t" })
|
@ValueSource(strings = { "\n", " ", "\t" })
|
||||||
|
|
Loading…
Reference in New Issue