diff --git a/hapi-fhir-checkstyle/src/checkstyle/hapi-base-checkstyle.xml b/hapi-fhir-checkstyle/src/checkstyle/hapi-base-checkstyle.xml index c74e8b7e01c..11c9b669ad4 100644 --- a/hapi-fhir-checkstyle/src/checkstyle/hapi-base-checkstyle.xml +++ b/hapi-fhir-checkstyle/src/checkstyle/hapi-base-checkstyle.xml @@ -64,4 +64,12 @@ + + + + + + + + diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_6_0/6285-batch2-reindex-version2-added.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_6_0/6285-batch2-reindex-version2-added.yaml new file mode 100644 index 00000000000..2e48e6e7bd9 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_6_0/6285-batch2-reindex-version2-added.yaml @@ -0,0 +1,17 @@ +--- +type: fix +issue: 6285 +title: "Updated the Reindex Batch2 job to allow + for an additional step that will check to ensure + that no pending 'reindex' work is needed. + This was done to prevent a bug in which + value set expansion would not return all + the existing CodeSystem Concepts after + a reindex call, due to some of the concepts + being deferred to future job runs. + + As such, `$reindex` operations on CodeSystems + will no longer result in incorrect value set + expansion when such an expansion is called + 'too soon' after a $reindex operation. +" diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java index fbd583d0090..ebe9c4aa574 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java @@ -21,7 +21,6 @@ package ca.uhn.fhir.jpa.dao; import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobPartitionProvider; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.context.FhirVersionEnum; @@ -158,6 +157,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; import static java.util.Objects.isNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -1315,7 +1315,7 @@ public abstract class BaseHapiFhirResourceDao extends B myJobPartitionProvider.getPartitionedUrls(theRequestDetails, urls).forEach(params::addPartitionedUrl); JobInstanceStartRequest request = new JobInstanceStartRequest(); - request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + request.setJobDefinitionId(JOB_REINDEX); request.setParameters(params); myJobCoordinator.startInstance(theRequestDetails, request); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/JpaResourceDaoCodeSystem.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/JpaResourceDaoCodeSystem.java index 5e8f6b053bb..270238da3fb 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/JpaResourceDaoCodeSystem.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/JpaResourceDaoCodeSystem.java @@ -27,7 +27,11 @@ import ca.uhn.fhir.context.support.IValidationSupport.CodeValidationResult; import ca.uhn.fhir.context.support.LookupCodeRequest; import ca.uhn.fhir.context.support.ValidationSupportContext; import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem; +import ca.uhn.fhir.jpa.api.dao.ReindexOutcome; +import ca.uhn.fhir.jpa.api.dao.ReindexParameters; +import ca.uhn.fhir.jpa.api.model.ReindexJobStatus; import ca.uhn.fhir.jpa.api.svc.IIdHelperService; import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource; import ca.uhn.fhir.jpa.model.entity.ResourceTable; @@ -176,6 +180,47 @@ public class JpaResourceDaoCodeSystem extends BaseHapiF myTermDeferredStorageSvc.deleteCodeSystemForResource(theEntityToDelete); } + /** + * If there are more code systems to process + * than {@link JpaStorageSettings#getDeferIndexingForCodesystemsOfSize()}, + * then these codes will have their processing deferred (for a later time). + * + * This can result in future reindex steps *skipping* these code systems (if + * they're still deferred) and thus incorrect expansions resulting. + * + * So we override the reindex method for CodeSystems specifically to + * force reindex batch jobs to wait until all code systems are processed before + * moving on. + */ + @SuppressWarnings("rawtypes") + @Override + public ReindexOutcome reindex( + IResourcePersistentId thePid, + ReindexParameters theReindexParameters, + RequestDetails theRequest, + TransactionDetails theTransactionDetails) { + ReindexOutcome outcome = super.reindex(thePid, theReindexParameters, theRequest, theTransactionDetails); + + if (outcome.getWarnings().isEmpty()) { + outcome.setHasPendingWork(true); + } + return outcome; + } + + @Override + public ReindexJobStatus getReindexJobStatus() { + boolean isQueueEmpty = myTermDeferredStorageSvc.isStorageQueueEmpty(true); + + ReindexJobStatus status = new ReindexJobStatus(); + status.setHasReindexWorkPending(!isQueueEmpty); + if (status.isHasReindexWorkPending()) { + // force a run + myTermDeferredStorageSvc.saveDeferred(); + } + + return status; + } + @Override public ResourceTable updateEntity( RequestDetails theRequest, diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermCodeSystemStorageSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermCodeSystemStorageSvcImpl.java index af01a692a32..433f3f5d01c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermCodeSystemStorageSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermCodeSystemStorageSvcImpl.java @@ -593,7 +593,7 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc { if (theStatisticsTracker.getUpdatedConceptCount() <= myStorageSettings.getDeferIndexingForCodesystemsOfSize()) { saveConcept(conceptToAdd); Long nextConceptPid = conceptToAdd.getId(); - Validate.notNull(nextConceptPid); + Objects.requireNonNull(nextConceptPid); } else { myDeferredStorageSvc.addConceptToStorageQueue(conceptToAdd); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java index f5e1b95d49c..47aafd066dd 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java @@ -22,7 +22,6 @@ package ca.uhn.fhir.jpa.term; import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; -import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao; import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao; @@ -79,6 +78,8 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHas private static final long SAVE_ALL_DEFERRED_WARN_MINUTES = 1; private static final long SAVE_ALL_DEFERRED_ERROR_MINUTES = 5; private boolean myAllowDeferredTasksTimeout = true; + private static final List BATCH_JOBS_TO_CARE_ABOUT = + List.of(TERM_CODE_SYSTEM_DELETE_JOB_NAME, TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME); private final List myDeferredCodeSystemsDeletions = Collections.synchronizedList(new ArrayList<>()); private final Queue myDeferredCodeSystemVersionsDeletions = new ConcurrentLinkedQueue<>(); private final List myDeferredConcepts = Collections.synchronizedList(new ArrayList<>()); @@ -436,7 +437,7 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHas return retVal; } - private boolean isJobsExecuting() { + public boolean isJobsExecuting() { cleanseEndedJobs(); return !myJobExecutions.isEmpty(); @@ -448,15 +449,18 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHas * This is mostly a fail-safe * because "cancelled" jobs are never removed. */ - List executions = new ArrayList<>(myJobExecutions); List idsToDelete = new ArrayList<>(); - for (String id : executions) { - // TODO - might want to consider a "fetch all instances" - JobInstance instance = myJobCoordinator.getInstance(id); - if (StatusEnum.getEndedStatuses().contains(instance.getStatus())) { + for (String jobId : BATCH_JOBS_TO_CARE_ABOUT) { + List jobInstanceInEndedState = myJobCoordinator.getInstancesbyJobDefinitionIdAndEndedStatus( + jobId, + true, // ended = true (COMPLETED, FAILED, CANCELLED jobs only) + Math.max(myJobExecutions.size(), 1), // at most this many + 0); + for (JobInstance instance : jobInstanceInEndedState) { idsToDelete.add(instance.getInstanceId()); } } + for (String id : idsToDelete) { myJobExecutions.remove(id); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/api/ITermDeferredStorageSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/api/ITermDeferredStorageSvc.java index 62fb8d8957a..6b8c3ad0dab 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/api/ITermDeferredStorageSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/api/ITermDeferredStorageSvc.java @@ -78,6 +78,8 @@ public interface ITermDeferredStorageSvc { void logQueueForUnitTest(); + boolean isJobsExecuting(); + /** * Only to be used from tests - Disallow test timeouts on deferred tasks */ diff --git a/hapi-fhir-jpaserver-test-dstu3/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/ResourceProviderCustomSearchParamDstu3Test.java b/hapi-fhir-jpaserver-test-dstu3/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/ResourceProviderCustomSearchParamDstu3Test.java index 9dcf6439dbb..6e1511543c8 100644 --- a/hapi-fhir-jpaserver-test-dstu3/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/ResourceProviderCustomSearchParamDstu3Test.java +++ b/hapi-fhir-jpaserver-test-dstu3/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/ResourceProviderCustomSearchParamDstu3Test.java @@ -1,14 +1,11 @@ package ca.uhn.fhir.jpa.provider.dstu3; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; -import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.jpa.model.entity.ResourceTable; +import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.server.IBundleProvider; @@ -46,9 +43,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.fail; @@ -202,7 +201,7 @@ public class ResourceProviderCustomSearchParamDstu3Test extends BaseResourceProv mySearchParameterDao.create(fooSp, mySrd); runInTransaction(()->{ - List allJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX); + List allJobs = myBatch2JobHelper.findJobsByDefinition(JOB_REINDEX); assertEquals(1, allJobs.size()); assertEquals(1, allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().size()); assertEquals("Patient?", allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().get(0).getUrl()); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4ComboUniqueParamTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4ComboUniqueParamTest.java index 05f4c5a5e24..b46d8b964d6 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4ComboUniqueParamTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4ComboUniqueParamTest.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.UUID; import java.util.stream.Collectors; +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXED; import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED; import static org.assertj.core.api.Assertions.assertThat; @@ -1075,7 +1076,7 @@ public class FhirResourceDaoR4ComboUniqueParamTest extends BaseComboParamsR4Test parameters.addUrl(url); } JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters(parameters); Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest); ourLog.info("Started reindex job with id {}", res.getInstanceId()); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4IndexStorageOptimizedTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4IndexStorageOptimizedTest.java index d94b0089337..98eeb191a21 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4IndexStorageOptimizedTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4IndexStorageOptimizedTest.java @@ -23,7 +23,7 @@ import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamUri; import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.jpa.model.util.SearchParamHash; import ca.uhn.fhir.jpa.model.util.UcumServiceUtil; -import ca.uhn.fhir.jpa.reindex.ReindexStepTest; +import ca.uhn.fhir.jpa.reindex.ReindexStepV1Test; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.test.BaseJpaR4Test; import ca.uhn.fhir.rest.param.BaseParam; @@ -57,6 +57,7 @@ import org.springframework.data.jpa.repository.JpaRepository; import java.util.List; +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -312,7 +313,7 @@ public class FhirResourceDaoR4IndexStorageOptimizedTest extends BaseJpaR4Test { parameters.addUrl(url); } JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters(parameters); Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest); ourLog.info("Started reindex job with id {}", res.getInstanceId()); @@ -321,7 +322,7 @@ public class FhirResourceDaoR4IndexStorageOptimizedTest extends BaseJpaR4Test { // Additional existing tests with enabled IndexStorageOptimized @Nested - public class IndexStorageOptimizedReindexStepTest extends ReindexStepTest { + public class IndexStorageOptimizedReindexStepTestV1 extends ReindexStepV1Test { @BeforeEach void setUp() { myStorageSettings.setIndexStorageOptimized(true); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java index 52735147cd9..9f5ed69f7a9 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java @@ -2,12 +2,15 @@ package ca.uhn.fhir.jpa.dao.r4; import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson; import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeStep; +import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexStepV1; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep; +import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.support.ValidationSupportContext; import ca.uhn.fhir.context.support.ValueSetExpansionOptions; @@ -18,13 +21,13 @@ import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome; import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum; import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao; -import ca.uhn.fhir.jpa.reindex.ReindexTestHelper; import ca.uhn.fhir.jpa.entity.TermValueSet; import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum; import ca.uhn.fhir.jpa.interceptor.ForceOffsetSearchModeInterceptor; import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test; +import ca.uhn.fhir.jpa.reindex.ReindexTestHelper; import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc; @@ -146,7 +149,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test @Autowired private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc; @Autowired - private ReindexStep myReindexStep; + private ReindexStepV1 myReindexStepV1; @Autowired private DeleteExpungeStep myDeleteExpungeStep; @Autowired @@ -1018,7 +1021,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test // insert to: HFJ_RESOURCE, HFJ_RES_VER, HFJ_RES_LINK assertEquals(4, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); - } @ParameterizedTest @@ -1031,7 +1033,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test }) public void testReindexJob_OptimizeStorage(boolean theOptimisticLock, ReindexParameters.OptimizeStorageModeEnum theOptimizeStorageModeEnum, int theExpectedSelectCount, int theExpectedUpdateCount) { // Setup - ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(); IIdType patientId = createPatient(withActiveTrue()); IIdType orgId = createOrganization(withName("MY ORG")); @@ -1056,7 +1057,14 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test // execute myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, mock(IJobDataSink.class), "123", "456", params); + JobInstance instance = new JobInstance(); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + mock(WorkChunk.class) + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, mock(IJobDataSink.class)); // validate assertThat(myCaptureQueriesListener.getSelectQueriesForCurrentThread()).hasSize(theExpectedSelectCount); @@ -1064,7 +1072,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test assertThat(myCaptureQueriesListener.getInsertQueriesForCurrentThread()).isEmpty(); assertThat(myCaptureQueriesListener.getDeleteQueriesForCurrentThread()).isEmpty(); assertEquals(10, outcome.getRecordsProcessed()); - } @Test @@ -1095,7 +1102,14 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test // execute myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, mock(IJobDataSink.class), "123", "456", params); + JobInstance instance = new JobInstance(); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + mock(WorkChunk.class) + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, mock(IJobDataSink.class)); assertEquals(20, outcome.getRecordsProcessed()); // validate @@ -1103,10 +1117,8 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test assertEquals(0, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size()); assertEquals(0, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size()); assertEquals(0, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size()); - } - public void assertNoPartitionSelectors() { List selectQueries = myCaptureQueriesListener.getSelectQueriesForCurrentThread(); for (SqlQuery next : selectQueries) { diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchCustomSearchParamTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchCustomSearchParamTest.java index bff403a5ea4..a9f3cc57519 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchCustomSearchParamTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchCustomSearchParamTest.java @@ -1,6 +1,5 @@ package ca.uhn.fhir.jpa.dao.r4; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.context.phonetic.PhoneticEncoderEnum; @@ -22,7 +21,6 @@ import ca.uhn.fhir.rest.param.NumberParam; import ca.uhn.fhir.rest.param.ReferenceOrListParam; import ca.uhn.fhir.rest.param.ReferenceParam; import ca.uhn.fhir.rest.param.StringParam; -import ca.uhn.fhir.rest.param.TokenOrListParam; import ca.uhn.fhir.rest.param.TokenParam; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; @@ -70,6 +68,7 @@ import org.springframework.transaction.support.TransactionTemplate; import java.util.List; import java.util.stream.Collectors; +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; import static org.apache.commons.lang3.StringUtils.countMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -441,11 +440,11 @@ public class FhirResourceDaoR4SearchCustomSearchParamTest extends BaseJpaR4Test fooSp.setXpathUsage(org.hl7.fhir.r4.model.SearchParameter.XPathUsageType.NORMAL); fooSp.setStatus(org.hl7.fhir.r4.model.Enumerations.PublicationStatus.ACTIVE); - List initialJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX); + List initialJobs = myBatch2JobHelper.findJobsByDefinition(JOB_REINDEX); mySearchParameterDao.create(fooSp, mySrd); - List finalJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX); + List finalJobs = myBatch2JobHelper.findJobsByDefinition(JOB_REINDEX); List newJobs = finalJobs.stream().filter(t -> !initialJobs.contains(t)).collect(Collectors.toList()); assertThat(newJobs.size()).as("number of jobs created").isEqualTo(1); } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepV1Test.java similarity index 92% rename from hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepTest.java rename to hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepV1Test.java index 69ed1e1507c..539c4590a4c 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepV1Test.java @@ -6,7 +6,7 @@ import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep; +import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexStepV1; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -26,7 +26,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -public class ReindexStepTest { +public class ReindexStepV1Test { @Mock private HapiTransactionService myHapiTransactionService; @@ -34,7 +34,7 @@ public class ReindexStepTest { private IJobDataSink myDataSink; @InjectMocks - private ReindexStep myReindexStep; + private ReindexStepV1 myReindexStepV1; @Captor private ArgumentCaptor builderArgumentCaptor; @@ -51,7 +51,7 @@ public class ReindexStepTest { when(myHapiTransactionService.buildExecutionBuilder(any())).thenCallRealMethod(); // when - myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", reindexJobParameters); + myReindexStepV1.doReindex(data, myDataSink, "index-id", "chunk-id", reindexJobParameters); // then assertMethodArgumentRequestPartitionId(expectedPartitionId); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderCustomSearchParamR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderCustomSearchParamR4Test.java index 987f3430873..0bee635ccc0 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderCustomSearchParamR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderCustomSearchParamR4Test.java @@ -1,8 +1,5 @@ package ca.uhn.fhir.jpa.provider.r4; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.i18n.Msg; @@ -60,9 +57,11 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.fail; @@ -244,7 +243,7 @@ public class ResourceProviderCustomSearchParamR4Test extends BaseResourceProvide runInTransaction(() -> { myBatch2JobHelper.forceRunMaintenancePass(); - List allJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX); + List allJobs = myBatch2JobHelper.findJobsByDefinition(JOB_REINDEX); assertEquals(1, allJobs.size()); assertEquals(1, allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().size()); assertEquals("Patient?", allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().get(0).getUrl()); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepV1Test.java similarity index 80% rename from hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepTest.java rename to hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepV1Test.java index d7ed24e6789..8075789db8d 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepV1Test.java @@ -2,10 +2,13 @@ package ca.uhn.fhir.jpa.reindex; import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep; +import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexStepV1; +import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.test.BaseJpaR4Test; @@ -25,13 +28,14 @@ import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXED; import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -public class ReindexStepTest extends BaseJpaR4Test { +public class ReindexStepV1Test extends BaseJpaR4Test { @Autowired - private ReindexStep myReindexStep; + private ReindexStepV1 myReindexStepV1; @Mock private IJobDataSink myDataSink; @@ -46,9 +50,7 @@ public class ReindexStepTest extends BaseJpaR4Test { @Test public void testReindex_NoActionNeeded() { - // Setup - Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong(); Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong(); @@ -57,9 +59,19 @@ public class ReindexStepTest extends BaseJpaR4Test { data.addTypedPid("Patient", id1); // Execute - + ReindexJobParameters params = new ReindexJobParameters(); myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); + JobInstance instance = new JobInstance(); + instance.setInstanceId("index-id"); + WorkChunk chunk = new WorkChunk(); + chunk.setId("chunk-id"); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + chunk + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink); // Verify assertEquals(2, outcome.getRecordsProcessed()); @@ -72,12 +84,9 @@ public class ReindexStepTest extends BaseJpaR4Test { assertEquals(0, myCaptureQueriesListener.getRollbackCount()); } - @Test public void testReindex_NoActionNeeded_IndexMissingFieldsEnabled() { - // Setup - myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED); Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong(); @@ -88,9 +97,16 @@ public class ReindexStepTest extends BaseJpaR4Test { data.addTypedPid("Patient", id1); // Execute - + ReindexJobParameters params = new ReindexJobParameters(); myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); + JobInstance instance = new JobInstance(); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + mock(WorkChunk.class) + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink); // Verify assertEquals(2, outcome.getRecordsProcessed()); @@ -121,9 +137,16 @@ public class ReindexStepTest extends BaseJpaR4Test { }); // Execute - + ReindexJobParameters params = new ReindexJobParameters(); myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); + JobInstance instance = new JobInstance(); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + mock(WorkChunk.class) + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink); // Verify assertEquals(2, outcome.getRecordsProcessed()); @@ -136,12 +159,9 @@ public class ReindexStepTest extends BaseJpaR4Test { assertEquals(0, myCaptureQueriesListener.getRollbackCount()); } - @Test public void testReindex_IndexesAddedAndRemoved_IndexMissingFieldsEnabled() { - // Setup - myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED); boolean markResourcesForReindexingUponSearchParameterChange = myStorageSettings.isMarkResourcesForReindexingUponSearchParameterChange(); myStorageSettings.setMarkResourcesForReindexingUponSearchParameterChange(false); // if this were true, it would set up a lot of reindex jobs extraneous to the one we're trying to test @@ -189,9 +209,16 @@ public class ReindexStepTest extends BaseJpaR4Test { mySearchParamRegistry.forceRefresh(); // Execute - + ReindexJobParameters params = new ReindexJobParameters(); myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); + JobInstance instance = new JobInstance(); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + mock(WorkChunk.class) + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink); // Verify assertEquals(2, outcome.getRecordsProcessed()); @@ -207,9 +234,7 @@ public class ReindexStepTest extends BaseJpaR4Test { @Test public void testReindex_OneResourceReindexFailedButOthersSucceeded() { - // Setup - Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong(); Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong(); Long idPatientToInvalidate = createPatient().getIdPartAsLong(); @@ -234,9 +259,19 @@ public class ReindexStepTest extends BaseJpaR4Test { }); // Execute - + ReindexJobParameters params = new ReindexJobParameters(); myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); + JobInstance instance = new JobInstance(); + instance.setInstanceId("index-id"); + WorkChunk workChunk = new WorkChunk(); + workChunk.setId("workid"); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + workChunk + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink); // Verify assertEquals(4, outcome.getRecordsProcessed()); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexJobTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexTaskTest.java similarity index 95% rename from hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexJobTest.java rename to hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexTaskTest.java index fa9d93fb4ab..c04a40f1ac7 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexJobTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexTaskTest.java @@ -2,7 +2,6 @@ package ca.uhn.fhir.jpa.reindex; import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobPersistence; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; @@ -39,6 +38,7 @@ import java.util.Date; import java.util.List; import java.util.stream.Stream; +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -47,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @SuppressWarnings("SqlDialectInspection") -public class ReindexJobTest extends BaseJpaR4Test { +public class ReindexTaskTest extends BaseJpaR4Test { @Autowired private IJobCoordinator myJobCoordinator; @@ -101,7 +101,7 @@ public class ReindexJobTest extends BaseJpaR4Test { // execute JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters( new ReindexJobParameters() .setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION) @@ -158,7 +158,7 @@ public class ReindexJobTest extends BaseJpaR4Test { // execute JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters( new ReindexJobParameters() .setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS) @@ -217,7 +217,7 @@ public class ReindexJobTest extends BaseJpaR4Test { // execute JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters( new ReindexJobParameters() .setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS) @@ -252,7 +252,7 @@ public class ReindexJobTest extends BaseJpaR4Test { // execute JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters( new ReindexJobParameters() .setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION) @@ -294,7 +294,7 @@ public class ReindexJobTest extends BaseJpaR4Test { // execute JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters(parameters); Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest); myBatch2JobHelper.awaitJobCompletion(res); @@ -325,7 +325,7 @@ public class ReindexJobTest extends BaseJpaR4Test { // execute JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters(parameters); Batch2JobStartResponse res = myJobCoordinator.startInstance(startRequest); JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res); @@ -356,7 +356,7 @@ public class ReindexJobTest extends BaseJpaR4Test { parameters.addUrl("Observation?status=final"); JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters(parameters); Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest); myBatch2JobHelper.awaitJobCompletion(res); @@ -387,7 +387,7 @@ public class ReindexJobTest extends BaseJpaR4Test { // execute JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters(new ReindexJobParameters()); Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(mySrd, startRequest); myBatch2JobHelper.awaitJobCompletion(startResponse); @@ -406,7 +406,7 @@ public class ReindexJobTest extends BaseJpaR4Test { DaoMethodOutcome searchParameter = myReindexTestHelper.createUniqueCodeSearchParameter(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters(new ReindexJobParameters()); Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest); JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse); @@ -436,7 +436,7 @@ public class ReindexJobTest extends BaseJpaR4Test { // Run a reindex JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters(new ReindexJobParameters()); Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest); JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId(), 999); @@ -469,7 +469,7 @@ public class ReindexJobTest extends BaseJpaR4Test { // Run a reindex JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters(new ReindexJobParameters()); Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest); JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId(), 999); @@ -500,7 +500,7 @@ public class ReindexJobTest extends BaseJpaR4Test { // execute JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters(new ReindexJobParameters()); Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(mySrd, startRequest); JobInstance outcome = myBatch2JobHelper.awaitJobCompletion(startResponse); @@ -528,7 +528,7 @@ public class ReindexJobTest extends BaseJpaR4Test { // execute JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters(new ReindexJobParameters()); Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest); JobInstance outcome = myBatch2JobHelper.awaitJobFailure(startResponse); @@ -541,7 +541,7 @@ public class ReindexJobTest extends BaseJpaR4Test { @Test public void testReindex_withReindexingUponSearchParameterChangeEnabled_reindexJobCompleted() { - List jobInstances = myJobPersistence.fetchInstancesByJobDefinitionId(ReindexAppCtx.JOB_REINDEX, 10, 0); + List jobInstances = myJobPersistence.fetchInstancesByJobDefinitionId(JOB_REINDEX, 10, 0); assertThat(jobInstances).isEmpty(); // make sure the resources auto-reindex after the search parameter update is enabled @@ -552,7 +552,7 @@ public class ReindexJobTest extends BaseJpaR4Test { myReindexTestHelper.createCodeSearchParameter(); // check that reindex job was created - jobInstances = myJobPersistence.fetchInstancesByJobDefinitionId(ReindexAppCtx.JOB_REINDEX, 10, 0); + jobInstances = myJobPersistence.fetchInstancesByJobDefinitionId(JOB_REINDEX, 10, 0); assertThat(jobInstances).hasSize(1); // check that the job is completed (not stuck in QUEUED status) diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexJobWithPartitioningTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexTaskWithPartitioningTest.java similarity index 97% rename from hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexJobWithPartitioningTest.java rename to hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexTaskWithPartitioningTest.java index 8a643006439..35657c3ab47 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexJobWithPartitioningTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexTaskWithPartitioningTest.java @@ -1,9 +1,8 @@ package ca.uhn.fhir.jpa.reindex; import ca.uhn.fhir.batch2.api.IJobCoordinator; -import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters; import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx; +import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.interceptor.model.RequestPartitionId; @@ -25,9 +24,10 @@ import org.springframework.beans.factory.annotation.Autowired; import java.util.List; import java.util.stream.Stream; +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; import static org.assertj.core.api.Assertions.assertThat; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class ReindexJobWithPartitioningTest extends BaseJpaR4Test { +public class ReindexTaskWithPartitioningTest extends BaseJpaR4Test { @Autowired private IJobCoordinator myJobCoordinator; @@ -133,7 +133,7 @@ public class ReindexJobWithPartitioningTest extends BaseJpaR4Test { // execute JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters(parameters); Batch2JobStartResponse res = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest); JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4Test.java index 1f9f229c72a..d6f5039d129 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4Test.java @@ -1,5 +1,8 @@ package ca.uhn.fhir.jpa.term; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils; +import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.context.support.ConceptValidationOptions; @@ -9,6 +12,7 @@ import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet; +import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.entity.TermCodeSystem; import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion; import ca.uhn.fhir.jpa.entity.TermConcept; @@ -25,7 +29,10 @@ import ca.uhn.fhir.jpa.term.api.ITermReadSvc; import ca.uhn.fhir.jpa.term.custom.CustomTerminologySet; import ca.uhn.fhir.jpa.util.SqlQuery; import ca.uhn.fhir.jpa.util.ValueSetTestUtil; +import ca.uhn.fhir.parser.IParser; import ca.uhn.fhir.rest.api.server.IBundleProvider; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import com.google.common.collect.Lists; @@ -51,10 +58,16 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; import static ca.uhn.fhir.util.HapiExtensions.EXT_VALUESET_EXPANSION_MESSAGE; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -2081,7 +2094,172 @@ public class ValueSetExpansionR4Test extends BaseTermR4Test implements IValueSet outcome = myValueSetDao.validateCode(vs.getUrlElement(), null, new StringType("A"), cs.getUrlElement(), null, null, null, mySrd); assertEquals(false, outcome.isOk()); assertThat(outcome.getMessage()).contains("Code validation occurred using a ValueSet expansion that was pre-calculated"); - } + @Test + public void reindexCodeSystems_withDeferredCodeSystems_reindexesAllCodeSystems() { + // setup + int deferredIndexingDefault = myStorageSettings.getDeferIndexingForCodesystemsOfSize(); + + try { + /** + * The deferred count must be less than the number of + * concepts we are going to be uploading. + * That way, when we do the reindex, it will defer + * the additional code systems for a later job run. + * + * See {@link TermCodeSystemStorageSvcImpl#addConceptInHierarchy(TermCodeSystemVersion, Collection, TermConcept, UploadStatistics, Map, int)} + * + * Our CodeSystem below only has 6 Concepts to add. + * So we'll set the deferred count to 3 (so 3 will be deferred) + */ + myStorageSettings.setDeferIndexingForCodesystemsOfSize(3); + /* + * We're also setting our retry delay to a short timeframe + * so this test doesn't run too long. + */ + ReindexUtils.setRetryDelay(Duration.of(300, ChronoUnit.MILLIS)); + + IParser parser = myFhirContext.newJsonParser(); + + RequestDetails rq = new SystemRequestDetails(); + CodeSystem cs; + ValueSet vs; + String csStr; + { + String vsStr = """ + { + "resourceType": "ValueSet", + "id": "0447bffa-01fa-4405-828a-96192e74a5d8", + "meta": { + "versionId": "2", + "lastUpdated": "2024-04-09T15:06:24.025+00:00", + "source": "#f4491e490a6a2900" + }, + "url": "https://health.gov.on.ca/idms/fhir/ValueSet/IDMS-Submission-Types", + "version": "1.0.0", + "name": "IDMS-SUBMISSION-TYPES", + "title": "IDMS Submission Types", + "status": "active", + "experimental": false, + "date": "2023-09-28", + "publisher": "IDMS", + "description": "List of Submission Types", + "compose": { + "include": [ + { + "system": "https://health.gov.on.ca/idms/fhir/CodeSystem/Internal-Submission-Types" + } + ] + } + } + """; + vs = parser.parseResource(ValueSet.class, vsStr); + csStr = """ + { + "resourceType": "CodeSystem", + "id": "d9acd5b8-9533-4fa1-bb70-b4380957a8c3", + "meta": { + "versionId": "14", + "lastUpdated": "2024-06-03T17:49:56.580+00:00", + "source": "#261a82258b0978a8" + }, + "url": "https://health.gov.on.ca/idms/fhir/CodeSystem/Internal-Submission-Types", + "version": "1.0.0", + "name": "IDMS-Internal-Submission-Types", + "status": "active", + "date": "2023-09-07", + "publisher": "IDMS", + "description": "This contains a lists of codes Submission Type Codes.", + "content": "complete", + "concept": [ + { + "code": "SUB-BRAND-PRODUCT", + "display": "New Brand Product (Non-New Chemical Entity)" + }, + { + "code": "SUB-CLASSIFICATION", + "display": "Classification Change" + }, + { + "code": "SUB-CLINICIAN-LED-SUBMISSIONS", + "display": "Clinician-Led Submissions" + }, + { + "code": "SUB-DELISTING", + "display": "Delisting" + }, + { + "code": "SUB-DIN-CHANGE", + "display": "Drug Identification Number (DIN) Change" + }, + { + "code": "SUB-DISTRIBUTOR", + "display": "Distributor Change" + } + ] + } + """; + cs = parser.parseResource(CodeSystem.class, csStr); + } + + // create our ValueSet + myValueSetDao.update(vs, rq); + + // and the code system + myCodeSystemDao.update(cs, rq); + + // sanity check to make sure our code system was actually created + SearchParameterMap spMap = new SearchParameterMap(); + spMap.setLoadSynchronous(true); + IBundleProvider bp = myCodeSystemDao.search(spMap, rq); + assertEquals(1, bp.getAllResources().size()); + IBaseResource baseResource = bp.getAllResources().get(0); + CodeSystem cssaved; + if (baseResource instanceof CodeSystem saved) { + cssaved = saved; + } else { + fail("Should be a code system"); + return; + } + assertEquals(cs.getConcept().size(), cssaved.getConcept().size()); + + // test + // perform the reindex (we'll only target the CodeSystem here) + ReindexJobParameters params = new ReindexJobParameters(); + params.addUrl("CodeSystem?"); + JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); + startRequest.setJobDefinitionId(JOB_REINDEX); + startRequest.setParameters(params); + + // and wait for it to complete + Batch2JobStartResponse response = myJobCoordinator.startInstance(rq, startRequest); + myBatch2JobHelper.awaitJobCompletion(response); + + // verify by doing the value expansion + ValueSetExpansionOptions options = new ValueSetExpansionOptions(); + options.setCount(200); // this is way more than exist, so it's ok + ValueSet expanded = myValueSetDao.expand(vs, options); + assertNotNull(expanded); + + /* + * If the reindex was performed correctly, the expanded ValueSet + * should contain all the CodeSystem concepts that we originally + * uploaded (and nothing else). + */ + HashSet all = new HashSet<>(); + for (CodeSystem.ConceptDefinitionComponent set : cs.getConcept()) { + all.add(set.getCode()); + } + for (ValueSet.ValueSetExpansionContainsComponent v : expanded.getExpansion().getContains()) { + all.remove(v.getCode()); + } + assertTrue(all.isEmpty(), String.join(", ", all)); + assertEquals(cs.getConcept().size(), expanded.getExpansion().getTotal()); + } finally { + // set back to standard values + myStorageSettings.setDeferIndexingForCodesystemsOfSize(deferredIndexingDefault); + ReindexUtils.setRetryDelay(null); + } + } } diff --git a/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/dao/r5/DuplicateIndexR5Test.java b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/dao/r5/DuplicateIndexR5Test.java index d88d136d089..2a4387b4597 100644 --- a/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/dao/r5/DuplicateIndexR5Test.java +++ b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/dao/r5/DuplicateIndexR5Test.java @@ -1,6 +1,5 @@ package ca.uhn.fhir.jpa.dao.r5; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; @@ -19,6 +18,7 @@ import org.hl7.fhir.r5.model.Reference; import org.hl7.fhir.r5.model.SearchParameter; import org.junit.jupiter.api.Test; +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; import static org.junit.jupiter.api.Assertions.assertEquals; public class DuplicateIndexR5Test extends BaseJpaR5Test { @@ -149,7 +149,7 @@ public class DuplicateIndexR5Test extends BaseJpaR5Test { ReindexJobParameters parameters = new ReindexJobParameters(); parameters.addUrl("Patient?"); JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); startRequest.setParameters(parameters); Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest); myBatch2JobHelper.awaitJobCompletion(res.getInstanceId()); diff --git a/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/provider/r5/ResourceProviderR5Test.java b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/provider/r5/ResourceProviderR5Test.java index f6f26c10fc9..4ee5ea481c4 100644 --- a/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/provider/r5/ResourceProviderR5Test.java +++ b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/provider/r5/ResourceProviderR5Test.java @@ -1,7 +1,6 @@ package ca.uhn.fhir.jpa.provider.r5; import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.interceptor.model.RequestPartitionId; @@ -67,6 +66,7 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; import static org.apache.commons.lang3.StringUtils.leftPad; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -374,7 +374,7 @@ public class ResourceProviderR5Test extends BaseResourceProviderR5Test { ReindexJobParameters jobParameters = new ReindexJobParameters(); jobParameters.addPartitionedUrl(new PartitionedUrl().setRequestPartitionId(RequestPartitionId.allPartitions())); JobInstanceStartRequest request = new JobInstanceStartRequest(); - request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + request.setJobDefinitionId(JOB_REINDEX); request.setParameters(jobParameters); Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request); diff --git a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/BaseJpaR4Test.java b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/BaseJpaR4Test.java index 2fb5a9b87f6..8b98ddfe30e 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/BaseJpaR4Test.java +++ b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/BaseJpaR4Test.java @@ -19,6 +19,7 @@ */ package ca.uhn.fhir.jpa.test; +import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobMaintenanceService; import ca.uhn.fhir.batch2.jobs.export.BulkDataExportProvider; import ca.uhn.fhir.context.FhirContext; @@ -559,6 +560,8 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil @Autowired protected IJobMaintenanceService myJobMaintenanceService; + @Autowired + protected IJobCoordinator myJobCoordinator; @RegisterExtension private final PreventDanglingInterceptorsExtension myPreventDanglingInterceptorsExtension = new PreventDanglingInterceptorsExtension(()-> myInterceptorRegistry); diff --git a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/PatientReindexTestHelper.java b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/PatientReindexTestHelper.java index 346ecfc519f..bb063387b89 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/PatientReindexTestHelper.java +++ b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/PatientReindexTestHelper.java @@ -38,6 +38,7 @@ import org.junit.jupiter.params.provider.Arguments; import java.util.List; import java.util.stream.Stream; +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @@ -170,7 +171,7 @@ public class PatientReindexTestHelper { private JobInstanceStartRequest createPatientReindexRequest(int theBatchSize) { JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setJobDefinitionId(JOB_REINDEX); ReindexJobParameters reindexJobParameters = new ReindexJobParameters(); reindexJobParameters.setBatchSize(Math.max(theBatchSize,1)); diff --git a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImplTest.java b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImplTest.java index eed255e7842..2365300b65a 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImplTest.java +++ b/hapi-fhir-jpaserver-test-utilities/src/test/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImplTest.java @@ -17,11 +17,15 @@ import org.springframework.test.util.ReflectionTestUtils; import org.springframework.transaction.PlatformTransactionManager; import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_DELETE_JOB_NAME; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.times; @@ -67,8 +71,15 @@ public class TermDeferredStorageSvcImplTest { ReflectionTestUtils.setField(mySvc, "myJobExecutions", mockExecutions); - when(myJobCoordinator.getInstance(eq(jobId))) - .thenReturn(instance); + when(myJobCoordinator.getInstancesbyJobDefinitionIdAndEndedStatus( + eq(TERM_CODE_SYSTEM_DELETE_JOB_NAME), + eq(true), + anyInt(), + eq(0) + )) + .thenReturn(List.of()) // first nothing + .thenReturn(List.of(instance)); // then the list with the instance + assertFalse(mySvc.isStorageQueueEmpty(true)); instance.setStatus(StatusEnum.COMPLETED); assertTrue(mySvc.isStorageQueueEmpty(true)); diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java index a49d054bf39..ba335b1f60b 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java @@ -21,69 +21,37 @@ package ca.uhn.fhir.batch2.jobs.reindex; import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobPartitionProvider; -import ca.uhn.fhir.batch2.api.IJobStepWorker; -import ca.uhn.fhir.batch2.api.VoidModel; -import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; -import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; -import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator; -import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep; -import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep; -import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService; +import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexV1Config; +import ca.uhn.fhir.batch2.jobs.reindex.v2.ReindexV2Config; import ca.uhn.fhir.context.FhirContext; -import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; -import ca.uhn.fhir.rest.server.provider.ProviderConstants; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; @Configuration +@Import({ReindexV1Config.class, ReindexV2Config.class}) public class ReindexAppCtx { - public static final String JOB_REINDEX = "REINDEX"; + @Autowired + private HapiTransactionService myHapiTransactionService; - @Bean - public JobDefinition reindexJobDefinition(IBatch2DaoSvc theBatch2DaoSvc) { - return JobDefinition.newBuilder() - .setJobDefinitionId(JOB_REINDEX) - .setJobDescription("Reindex resources") - .setJobDefinitionVersion(1) - .setParametersType(ReindexJobParameters.class) - .setParametersValidator(reindexJobParametersValidator(theBatch2DaoSvc)) - .gatedExecution() - .addFirstStep( - "generate-ranges", - "Generate data ranges to reindex", - ChunkRangeJson.class, - reindexGenerateRangeChunksStep()) - .addIntermediateStep( - "load-ids", - "Load IDs of resources to reindex", - ResourceIdListWorkChunkJson.class, - reindexLoadIdsStep(theBatch2DaoSvc)) - .addLastStep("reindex", "Perform the resource reindex", reindexStep()) - .build(); - } + @Autowired + private IFhirSystemDao mySystemDao; - @Bean - public IJobStepWorker reindexGenerateRangeChunksStep() { - return new GenerateRangeChunksStep<>(); - } + @Autowired + private DaoRegistry myRegistry; - @Bean - public IJobStepWorker reindexLoadIdsStep( - IBatch2DaoSvc theBatch2DaoSvc) { - return new LoadIdsStep<>(theBatch2DaoSvc); - } + @Autowired + private IIdHelperService> myIdHelperService; - @Bean - public ReindexJobParametersValidator reindexJobParametersValidator(IBatch2DaoSvc theBatch2DaoSvc) { - return new ReindexJobParametersValidator( - new UrlListValidator(ProviderConstants.OPERATION_REINDEX, theBatch2DaoSvc)); - } - - @Bean - public ReindexStep reindexStep() { - return new ReindexStep(); - } + /* Shared services */ @Bean public ReindexProvider reindexProvider( @@ -92,4 +60,9 @@ public class ReindexAppCtx { IJobPartitionProvider theJobPartitionHandler) { return new ReindexProvider(theFhirContext, theJobCoordinator, theJobPartitionHandler); } + + @Bean + public ReindexJobService jobService() { + return new ReindexJobService(myRegistry); + } } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexProvider.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexProvider.java index 5889a75d4e9..a8f8e08707c 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexProvider.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexProvider.java @@ -44,6 +44,7 @@ import java.util.stream.Collectors; import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.OPTIMIZE_STORAGE; import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.REINDEX_SEARCH_PARAMETERS; +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; public class ReindexProvider { @@ -127,7 +128,7 @@ public class ReindexProvider { myJobPartitionProvider.getPartitionedUrls(theRequestDetails, urls).forEach(params::addPartitionedUrl); JobInstanceStartRequest request = new JobInstanceStartRequest(); - request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + request.setJobDefinitionId(JOB_REINDEX); request.setParameters(params); Batch2JobStartResponse response = myJobCoordinator.startInstance(theRequestDetails, request); diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java deleted file mode 100644 index 2644f3af89f..00000000000 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java +++ /dev/null @@ -1,203 +0,0 @@ -/*- - * #%L - * hapi-fhir-storage-batch2-jobs - * %% - * Copyright (C) 2014 - 2024 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package ca.uhn.fhir.batch2.jobs.reindex; - -import ca.uhn.fhir.batch2.api.IJobDataSink; -import ca.uhn.fhir.batch2.api.IJobStepWorker; -import ca.uhn.fhir.batch2.api.JobExecutionFailedException; -import ca.uhn.fhir.batch2.api.RunOutcome; -import ca.uhn.fhir.batch2.api.StepExecutionDetails; -import ca.uhn.fhir.batch2.api.VoidModel; -import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; -import ca.uhn.fhir.jpa.api.dao.DaoRegistry; -import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; -import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; -import ca.uhn.fhir.jpa.api.dao.ReindexOutcome; -import ca.uhn.fhir.jpa.api.dao.ReindexParameters; -import ca.uhn.fhir.jpa.api.svc.IIdHelperService; -import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; -import ca.uhn.fhir.parser.DataFormatException; -import ca.uhn.fhir.rest.api.server.RequestDetails; -import ca.uhn.fhir.rest.api.server.SystemRequestDetails; -import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; -import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; -import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; -import ca.uhn.fhir.util.StopWatch; -import jakarta.annotation.Nonnull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class ReindexStep implements IJobStepWorker { - - public static final int REINDEX_MAX_RETRIES = 10; - - private static final Logger ourLog = LoggerFactory.getLogger(ReindexStep.class); - - @Autowired - private HapiTransactionService myHapiTransactionService; - - @Autowired - private IFhirSystemDao mySystemDao; - - @Autowired - private DaoRegistry myDaoRegistry; - - @Autowired - private IIdHelperService myIdHelperService; - - @Nonnull - @Override - public RunOutcome run( - @Nonnull StepExecutionDetails theStepExecutionDetails, - @Nonnull IJobDataSink theDataSink) - throws JobExecutionFailedException { - - ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData(); - ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters(); - - return doReindex( - data, - theDataSink, - theStepExecutionDetails.getInstance().getInstanceId(), - theStepExecutionDetails.getChunkId(), - jobParameters); - } - - @Nonnull - public RunOutcome doReindex( - ResourceIdListWorkChunkJson data, - IJobDataSink theDataSink, - String theInstanceId, - String theChunkId, - ReindexJobParameters theJobParameters) { - RequestDetails requestDetails = new SystemRequestDetails(); - requestDetails.setRetry(true); - requestDetails.setMaxRetries(REINDEX_MAX_RETRIES); - TransactionDetails transactionDetails = new TransactionDetails(); - ReindexJob reindexJob = new ReindexJob( - data, requestDetails, transactionDetails, theDataSink, theInstanceId, theChunkId, theJobParameters); - - myHapiTransactionService - .withRequest(requestDetails) - .withTransactionDetails(transactionDetails) - .withRequestPartitionId(data.getRequestPartitionId()) - .execute(reindexJob); - - return new RunOutcome(data.size()); - } - - private class ReindexJob implements TransactionCallback { - private final ResourceIdListWorkChunkJson myData; - private final RequestDetails myRequestDetails; - private final TransactionDetails myTransactionDetails; - private final IJobDataSink myDataSink; - private final String myChunkId; - private final String myInstanceId; - private final ReindexJobParameters myJobParameters; - - public ReindexJob( - ResourceIdListWorkChunkJson theData, - RequestDetails theRequestDetails, - TransactionDetails theTransactionDetails, - IJobDataSink theDataSink, - String theInstanceId, - String theChunkId, - ReindexJobParameters theJobParameters) { - myData = theData; - myRequestDetails = theRequestDetails; - myTransactionDetails = theTransactionDetails; - myDataSink = theDataSink; - myInstanceId = theInstanceId; - myChunkId = theChunkId; - myJobParameters = theJobParameters; - myDataSink.setWarningProcessor(new ReindexWarningProcessor()); - } - - @Override - public Void doInTransaction(@Nonnull TransactionStatus theStatus) { - - List persistentIds = myData.getResourcePersistentIds(myIdHelperService); - - ourLog.info( - "Starting reindex work chunk with {} resources - Instance[{}] Chunk[{}]", - persistentIds.size(), - myInstanceId, - myChunkId); - StopWatch sw = new StopWatch(); - - // Prefetch Resources from DB - - boolean reindexSearchParameters = - myJobParameters.getReindexSearchParameters() != ReindexParameters.ReindexSearchParametersEnum.NONE; - mySystemDao.preFetchResources(persistentIds, reindexSearchParameters); - ourLog.info( - "Prefetched {} resources in {} - Instance[{}] Chunk[{}]", - persistentIds.size(), - sw, - myInstanceId, - myChunkId); - - ReindexParameters parameters = new ReindexParameters() - .setReindexSearchParameters(myJobParameters.getReindexSearchParameters()) - .setOptimizeStorage(myJobParameters.getOptimizeStorage()) - .setOptimisticLock(myJobParameters.getOptimisticLock()); - - // Reindex - - sw.restart(); - for (int i = 0; i < myData.size(); i++) { - - String nextResourceType = myData.getResourceType(i); - IFhirResourceDao dao = myDaoRegistry.getResourceDao(nextResourceType); - IResourcePersistentId resourcePersistentId = persistentIds.get(i); - try { - - ReindexOutcome outcome = - dao.reindex(resourcePersistentId, parameters, myRequestDetails, myTransactionDetails); - outcome.getWarnings().forEach(myDataSink::recoveredError); - - } catch (BaseServerResponseException | DataFormatException e) { - String resourceForcedId = myIdHelperService - .translatePidIdToForcedIdWithCache(resourcePersistentId) - .orElse(resourcePersistentId.toString()); - String resourceId = nextResourceType + "/" + resourceForcedId; - ourLog.debug("Failure during reindexing {}", resourceId, e); - myDataSink.recoveredError("Failure reindexing " + resourceId + ": " + e.getMessage()); - } - } - - ourLog.info( - "Finished reindexing {} resources in {} - {}/sec - Instance[{}] Chunk[{}]", - persistentIds.size(), - sw, - sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS), - myInstanceId, - myChunkId); - - return null; - } - } -} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexUtils.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexUtils.java new file mode 100644 index 00000000000..e54b32a90cc --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexUtils.java @@ -0,0 +1,39 @@ +package ca.uhn.fhir.batch2.jobs.reindex; + +import com.google.common.annotations.VisibleForTesting; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +public class ReindexUtils { + + /** + * The reindex job definition id + */ + public static final String JOB_REINDEX = "REINDEX"; + + public static final int REINDEX_MAX_RETRIES = 10; + + private static final Duration RETRY_DELAY = Duration.of(30, ChronoUnit.SECONDS); + + private static Duration myDelay; + + /** + * Returns the retry delay for reindex jobs that require polling. + */ + public static Duration getRetryLaterDelay() { + if (myDelay != null) { + return myDelay; + } + return RETRY_DELAY; + } + + /** + * Sets the retry delay to use for reindex jobs. + * Do not use this in production code! Only test code. + */ + @VisibleForTesting + public static void setRetryDelay(Duration theDuration) { + myDelay = theDuration; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/models/ReindexResults.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/models/ReindexResults.java new file mode 100644 index 00000000000..0eee14d3ec1 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/models/ReindexResults.java @@ -0,0 +1,29 @@ +package ca.uhn.fhir.batch2.jobs.reindex.models; + +import ca.uhn.fhir.model.api.IModelJson; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.HashMap; + +public class ReindexResults implements IModelJson { + + /** + * A map of resource type : whether or not the reindex is completed; + * true = more work needed. false (or omitted) = reindex is done + */ + @JsonProperty("resource2NeedsWork") + private HashMap myResourceToHasWorkToComplete; + + public ReindexResults() {} + + public HashMap getResourceToHasWorkToComplete() { + if (myResourceToHasWorkToComplete == null) { + myResourceToHasWorkToComplete = new HashMap<>(); + } + return myResourceToHasWorkToComplete; + } + + public void addResourceTypeToCompletionStatus(String theResourceType, boolean theRequiresMoreWork) { + getResourceToHasWorkToComplete().put(theResourceType, theRequiresMoreWork); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/svcs/ReindexJobService.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/svcs/ReindexJobService.java new file mode 100644 index 00000000000..39a4273a8b7 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/svcs/ReindexJobService.java @@ -0,0 +1,38 @@ +package ca.uhn.fhir.batch2.jobs.reindex.svcs; + +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.api.model.ReindexJobStatus; + +import java.util.Map; + +public class ReindexJobService { + + private final DaoRegistry myDaoRegistry; + + public ReindexJobService(DaoRegistry theRegistry) { + myDaoRegistry = theRegistry; + } + + /** + * Checks if any of the resource types in the map have any pending reindex work waiting. + * This will return true after the first such encounter, and only return false if no + * reindex work is required for any resource. + * @param theResourceTypesToCheckFlag map of resourceType:whether or not to check + * @return true if there's reindex work pending, false otherwise + */ + public boolean anyResourceHasPendingReindexWork(Map theResourceTypesToCheckFlag) { + for (String resourceType : theResourceTypesToCheckFlag.keySet()) { + boolean toCheck = theResourceTypesToCheckFlag.get(resourceType); + if (toCheck) { + IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType); + + ReindexJobStatus status = dao.getReindexJobStatus(); + if (status.isHasReindexWorkPending()) { + return true; + } + } + } + return false; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v1/ReindexJobParametersValidatorV1.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v1/ReindexJobParametersValidatorV1.java new file mode 100644 index 00000000000..a54ef49f9eb --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v1/ReindexJobParametersValidatorV1.java @@ -0,0 +1,57 @@ +/*- + * #%L + * hapi-fhir-storage-batch2-jobs + * %% + * Copyright (C) 2014 - 2024 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package ca.uhn.fhir.batch2.jobs.reindex.v1; + +import ca.uhn.fhir.batch2.api.IJobParametersValidator; +import ca.uhn.fhir.batch2.jobs.parameters.IUrlListValidator; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +@Deprecated(forRemoval = true, since = "7.6.0") +public class ReindexJobParametersValidatorV1 implements IJobParametersValidator { + + private final IUrlListValidator myUrlListValidator; + + public ReindexJobParametersValidatorV1(IUrlListValidator theUrlListValidator) { + myUrlListValidator = theUrlListValidator; + } + + @Nullable + @Override + public List validate(RequestDetails theRequestDetails, @Nonnull ReindexJobParameters theParameters) { + List errors = myUrlListValidator.validateUrls(theParameters.getUrls()); + + if (errors == null || errors.isEmpty()) { + // only check if there's no other errors (new list to fix immutable issues) + errors = new ArrayList<>(); + for (String url : theParameters.getUrls()) { + if (url.contains(" ") || url.contains("\n") || url.contains("\t")) { + errors.add("Invalid URL. URL cannot contain spaces : " + url); + } + } + } + return errors; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v1/ReindexStepV1.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v1/ReindexStepV1.java new file mode 100644 index 00000000000..c9adb93b91c --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v1/ReindexStepV1.java @@ -0,0 +1,117 @@ +/*- + * #%L + * hapi-fhir-storage-batch2-jobs + * %% + * Copyright (C) 2014 - 2024 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package ca.uhn.fhir.batch2.jobs.reindex.v1; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; +import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; +import jakarta.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.REINDEX_MAX_RETRIES; + +@Deprecated(forRemoval = true, since = "7.6.0") +public class ReindexStepV1 implements IJobStepWorker { + + private static final Logger ourLog = LoggerFactory.getLogger(ReindexStepV1.class); + + private final HapiTransactionService myHapiTransactionService; + + private final IFhirSystemDao mySystemDao; + + private final DaoRegistry myDaoRegistry; + + private final IIdHelperService> myIdHelperService; + + public ReindexStepV1( + HapiTransactionService theHapiTransactionService, + IFhirSystemDao theSystemDao, + DaoRegistry theRegistry, + IIdHelperService> theIdHelperService) { + myDaoRegistry = theRegistry; + myHapiTransactionService = theHapiTransactionService; + mySystemDao = theSystemDao; + myIdHelperService = theIdHelperService; + } + + @Nonnull + @Override + public RunOutcome run( + @Nonnull StepExecutionDetails theStepExecutionDetails, + @Nonnull IJobDataSink theDataSink) + throws JobExecutionFailedException { + + ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData(); + ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters(); + + doReindex( + data, + theDataSink, + theStepExecutionDetails.getInstance().getInstanceId(), + theStepExecutionDetails.getChunkId(), + jobParameters); + + return new RunOutcome(data.size()); + } + + public ReindexResults doReindex( + ResourceIdListWorkChunkJson data, + IJobDataSink theDataSink, + String theInstanceId, + String theChunkId, + ReindexJobParameters theJobParameters) { + RequestDetails requestDetails = new SystemRequestDetails(); + requestDetails.setRetry(true); + requestDetails.setMaxRetries(REINDEX_MAX_RETRIES); + + TransactionDetails transactionDetails = new TransactionDetails(); + ReindexTaskV1.JobParameters jp = new ReindexTaskV1.JobParameters(); + jp.setData(data) + .setRequestDetails(requestDetails) + .setTransactionDetails(transactionDetails) + .setDataSink(theDataSink) + .setInstanceId(theInstanceId) + .setChunkId(theChunkId) + .setJobParameters(theJobParameters); + + ReindexTaskV1 reindexJob = new ReindexTaskV1(jp, myDaoRegistry, mySystemDao, myIdHelperService); + + return myHapiTransactionService + .withRequest(requestDetails) + .withTransactionDetails(transactionDetails) + .withRequestPartitionId(data.getRequestPartitionId()) + .execute(reindexJob); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v1/ReindexTaskV1.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v1/ReindexTaskV1.java new file mode 100644 index 00000000000..f6dd6db4a8c --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v1/ReindexTaskV1.java @@ -0,0 +1,202 @@ +package ca.uhn.fhir.batch2.jobs.reindex.v1; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexWarningProcessor; +import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.api.dao.ReindexOutcome; +import ca.uhn.fhir.jpa.api.dao.ReindexParameters; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.parser.DataFormatException; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; +import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; +import ca.uhn.fhir.util.StopWatch; +import jakarta.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Deprecated(forRemoval = true, since = "7.6.0") +public class ReindexTaskV1 implements TransactionCallback { + private static final Logger ourLog = LoggerFactory.getLogger(ReindexTaskV1.class); + + public static class JobParameters { + private ResourceIdListWorkChunkJson myData; + private RequestDetails myRequestDetails; + private TransactionDetails myTransactionDetails; + private IJobDataSink myDataSink; + private String myChunkId; + private String myInstanceId; + private ReindexJobParameters myJobParameters; + + public ResourceIdListWorkChunkJson getData() { + return myData; + } + + public JobParameters setData(ResourceIdListWorkChunkJson theData) { + myData = theData; + return this; + } + + public RequestDetails getRequestDetails() { + return myRequestDetails; + } + + public JobParameters setRequestDetails(RequestDetails theRequestDetails) { + myRequestDetails = theRequestDetails; + return this; + } + + public TransactionDetails getTransactionDetails() { + return myTransactionDetails; + } + + public JobParameters setTransactionDetails(TransactionDetails theTransactionDetails) { + myTransactionDetails = theTransactionDetails; + return this; + } + + public IJobDataSink getDataSink() { + return myDataSink; + } + + public JobParameters setDataSink(IJobDataSink theDataSink) { + myDataSink = theDataSink; + return this; + } + + public String getChunkId() { + return myChunkId; + } + + public JobParameters setChunkId(String theChunkId) { + myChunkId = theChunkId; + return this; + } + + public String getInstanceId() { + return myInstanceId; + } + + public JobParameters setInstanceId(String theInstanceId) { + myInstanceId = theInstanceId; + return this; + } + + public ReindexJobParameters getJobParameters() { + return myJobParameters; + } + + public JobParameters setJobParameters(ReindexJobParameters theJobParameters) { + myJobParameters = theJobParameters; + return this; + } + } + + private final DaoRegistry myDaoRegistry; + private final IFhirSystemDao mySystemDao; + + private final IIdHelperService> myIdHelperService; + + private final ResourceIdListWorkChunkJson myData; + private final RequestDetails myRequestDetails; + private final TransactionDetails myTransactionDetails; + private final IJobDataSink myDataSink; + private final String myChunkId; + private final String myInstanceId; + private final ReindexJobParameters myJobParameters; + + public ReindexTaskV1( + JobParameters theJobParameters, + DaoRegistry theRegistry, + IFhirSystemDao theSystemDao, + IIdHelperService> theIdHelperService) { + myDaoRegistry = theRegistry; + mySystemDao = theSystemDao; + myIdHelperService = theIdHelperService; + + myData = theJobParameters.getData(); + myRequestDetails = theJobParameters.getRequestDetails(); + myTransactionDetails = theJobParameters.getTransactionDetails(); + myDataSink = theJobParameters.getDataSink(); + myInstanceId = theJobParameters.getInstanceId(); + myChunkId = theJobParameters.getChunkId(); + myJobParameters = theJobParameters.getJobParameters(); + myDataSink.setWarningProcessor(new ReindexWarningProcessor()); + } + + @Override + public ReindexResults doInTransaction(@Nonnull TransactionStatus theStatus) { + List> persistentIds = myData.getResourcePersistentIds(myIdHelperService); + + ourLog.info( + "Starting reindex work chunk with {} resources - Instance[{}] Chunk[{}]", + persistentIds.size(), + myInstanceId, + myChunkId); + StopWatch sw = new StopWatch(); + ReindexResults reindexResults = new ReindexResults(); + + // Prefetch Resources from DB + boolean reindexSearchParameters = + myJobParameters.getReindexSearchParameters() != ReindexParameters.ReindexSearchParametersEnum.NONE; + mySystemDao.preFetchResources(persistentIds, reindexSearchParameters); + ourLog.info( + "Prefetched {} resources in {} - Instance[{}] Chunk[{}]", + persistentIds.size(), + sw, + myInstanceId, + myChunkId); + + ReindexParameters parameters = new ReindexParameters() + .setReindexSearchParameters(myJobParameters.getReindexSearchParameters()) + .setOptimizeStorage(myJobParameters.getOptimizeStorage()) + .setOptimisticLock(myJobParameters.getOptimisticLock()); + + // Reindex + + sw.restart(); + for (int i = 0; i < myData.size(); i++) { + + String nextResourceType = myData.getResourceType(i); + IFhirResourceDao dao = myDaoRegistry.getResourceDao(nextResourceType); + IResourcePersistentId resourcePersistentId = persistentIds.get(i); + try { + + ReindexOutcome outcome = + dao.reindex(resourcePersistentId, parameters, myRequestDetails, myTransactionDetails); + + outcome.getWarnings().forEach(myDataSink::recoveredError); + reindexResults.addResourceTypeToCompletionStatus(nextResourceType, outcome.isHasPendingWork()); + + } catch (BaseServerResponseException | DataFormatException e) { + String resourceForcedId = myIdHelperService + .translatePidIdToForcedIdWithCache(resourcePersistentId) + .orElse(resourcePersistentId.toString()); + String resourceId = nextResourceType + "/" + resourceForcedId; + ourLog.error("Failure during reindexing {}", resourceId, e); + myDataSink.recoveredError("Failure reindexing " + resourceId + ": " + e.getMessage()); + } + } + + ourLog.info( + "Finished reindexing {} resources in {} - {}/sec - Instance[{}] Chunk[{}]", + persistentIds.size(), + sw, + sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS), + myInstanceId, + myChunkId); + + return reindexResults; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v1/ReindexV1Config.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v1/ReindexV1Config.java new file mode 100644 index 00000000000..8bda0695a5c --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v1/ReindexV1Config.java @@ -0,0 +1,101 @@ +package ca.uhn.fhir.batch2.jobs.reindex.v1; + +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; +import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; +import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; +import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService; +import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep; +import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep; +import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import ca.uhn.fhir.rest.server.provider.ProviderConstants; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; + +@Deprecated(forRemoval = true, since = "7.6.0") +@Configuration +public class ReindexV1Config { + @Autowired + private ReindexJobService myReindexJobService; + + @Autowired + private HapiTransactionService myHapiTransactionService; + + @Autowired + private IFhirSystemDao mySystemDao; + + @Autowired + private DaoRegistry myRegistry; + + @Autowired + private IIdHelperService> myIdHelperService; + + @Autowired + @Qualifier("reindexGenerateRangeChunkStepV1") + private IJobStepWorker myReindexGenerateRangeChunkStep; + + @Autowired + @Qualifier("reindexLoadIdsStepV1") + private IJobStepWorker myReindexLoadIdsStep; + + @Autowired + private ReindexJobParametersValidatorV1 myReindexJobParametersValidatorV1; + + // Version 1 + @Bean + public JobDefinition reindexJobDefinitionV1() { + return JobDefinition.newBuilder() + .setJobDefinitionId(JOB_REINDEX) + .setJobDescription("Reindex resources") + .setJobDefinitionVersion(1) + .setParametersType(ReindexJobParameters.class) + .setParametersValidator(myReindexJobParametersValidatorV1) + .gatedExecution() + .addFirstStep( + "generate-ranges", + "Generate data ranges to reindex", + ChunkRangeJson.class, + myReindexGenerateRangeChunkStep) + .addIntermediateStep( + "load-ids", + "Load IDs of resources to reindex", + ResourceIdListWorkChunkJson.class, + myReindexLoadIdsStep) + .addLastStep("reindex-start", "Start the resource reindex", reindexStepV1()) + .build(); + } + + @Bean + public ReindexStepV1 reindexStepV1() { + return new ReindexStepV1(myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService); + } + + @Bean("reindexGenerateRangeChunkStepV1") + public IJobStepWorker reindexGenerateRangeChunksStep() { + return new GenerateRangeChunksStep<>(); + } + + @Bean("reindexLoadIdsStepV1") + public IJobStepWorker reindexLoadIdsStep( + IBatch2DaoSvc theBatch2DaoSvc) { + return new LoadIdsStep<>(theBatch2DaoSvc); + } + + @Bean + public ReindexJobParametersValidatorV1 reindexJobParametersValidatorV1(IBatch2DaoSvc theBatch2DaoSvc) { + return new ReindexJobParametersValidatorV1( + new UrlListValidator(ProviderConstants.OPERATION_REINDEX, theBatch2DaoSvc)); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/CheckPendingReindexWorkStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/CheckPendingReindexWorkStep.java new file mode 100644 index 00000000000..9a9dfd294f9 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/CheckPendingReindexWorkStep.java @@ -0,0 +1,42 @@ +package ca.uhn.fhir.batch2.jobs.reindex.v2; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RetryChunkLaterException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils; +import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults; +import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService; +import ca.uhn.fhir.i18n.Msg; +import jakarta.annotation.Nonnull; + +public class CheckPendingReindexWorkStep implements IJobStepWorker { + + private final ReindexJobService myReindexJobService; + + public CheckPendingReindexWorkStep(ReindexJobService theReindexJobService) { + myReindexJobService = theReindexJobService; + } + + @Nonnull + @Override + public RunOutcome run( + @Nonnull StepExecutionDetails theStepExecutionDetails, + @Nonnull IJobDataSink theDataSink) + throws JobExecutionFailedException { + + ReindexResults results = theStepExecutionDetails.getData(); + + if (!results.getResourceToHasWorkToComplete().isEmpty()) { + if (myReindexJobService.anyResourceHasPendingReindexWork(results.getResourceToHasWorkToComplete())) { + throw new RetryChunkLaterException(Msg.code(2553), ReindexUtils.getRetryLaterDelay()); + } + } + + return RunOutcome.SUCCESS; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexJobParametersValidator.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/ReindexJobParametersValidatorV2.java similarity index 85% rename from hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexJobParametersValidator.java rename to hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/ReindexJobParametersValidatorV2.java index b7560a8ef57..9bfbbfe6564 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexJobParametersValidator.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/ReindexJobParametersValidatorV2.java @@ -17,10 +17,11 @@ * limitations under the License. * #L% */ -package ca.uhn.fhir.batch2.jobs.reindex; +package ca.uhn.fhir.batch2.jobs.reindex.v2; import ca.uhn.fhir.batch2.api.IJobParametersValidator; import ca.uhn.fhir.batch2.jobs.parameters.IUrlListValidator; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.rest.api.server.RequestDetails; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; @@ -28,11 +29,11 @@ import jakarta.annotation.Nullable; import java.util.ArrayList; import java.util.List; -public class ReindexJobParametersValidator implements IJobParametersValidator { +public class ReindexJobParametersValidatorV2 implements IJobParametersValidator { private final IUrlListValidator myUrlListValidator; - public ReindexJobParametersValidator(IUrlListValidator theUrlListValidator) { + public ReindexJobParametersValidatorV2(IUrlListValidator theUrlListValidator) { myUrlListValidator = theUrlListValidator; } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/ReindexStepV2.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/ReindexStepV2.java new file mode 100644 index 00000000000..adfad38c1c1 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/ReindexStepV2.java @@ -0,0 +1,118 @@ +package ca.uhn.fhir.batch2.jobs.reindex.v2; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RetryChunkLaterException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils; +import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults; +import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService; +import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; +import jakarta.annotation.Nonnull; + +import java.util.HashMap; +import java.util.Map; + +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.REINDEX_MAX_RETRIES; + +public class ReindexStepV2 + implements IJobStepWorker { + + private final ReindexJobService myReindexJobService; + private final HapiTransactionService myHapiTransactionService; + + private final IFhirSystemDao mySystemDao; + + private final DaoRegistry myDaoRegistry; + + private final IIdHelperService> myIdHelperService; + + public ReindexStepV2( + ReindexJobService theJobService, + HapiTransactionService theHapiTransactionService, + IFhirSystemDao theSystemDao, + DaoRegistry theRegistry, + IIdHelperService> theIdHelperService) { + myDaoRegistry = theRegistry; + myHapiTransactionService = theHapiTransactionService; + mySystemDao = theSystemDao; + myIdHelperService = theIdHelperService; + myReindexJobService = theJobService; + } + + @Nonnull + @Override + public RunOutcome run( + @Nonnull StepExecutionDetails theStepExecutionDetails, + @Nonnull IJobDataSink theDataSink) + throws JobExecutionFailedException { + ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData(); + ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters(); + + // This is not strictly necessary; + // but we'll ensure that no outstanding "reindex work" + // is waiting to be completed, so that when we do + // our reindex work here, it won't skip over that data + Map resourceTypesToCheckFlag = new HashMap<>(); + data.getTypedPids().forEach(id -> { + // we don't really care about duplicates; we check by resource type + resourceTypesToCheckFlag.put(id.getResourceType(), true); + }); + if (myReindexJobService.anyResourceHasPendingReindexWork(resourceTypesToCheckFlag)) { + + throw new RetryChunkLaterException(Msg.code(2552), ReindexUtils.getRetryLaterDelay()); + } + + ReindexResults results = doReindex( + data, + theDataSink, + theStepExecutionDetails.getInstance().getInstanceId(), + theStepExecutionDetails.getChunkId(), + jobParameters); + + theDataSink.accept(results); + + return new RunOutcome(data.size()); + } + + public ReindexResults doReindex( + ResourceIdListWorkChunkJson data, + IJobDataSink theDataSink, + String theInstanceId, + String theChunkId, + ReindexJobParameters theJobParameters) { + RequestDetails requestDetails = new SystemRequestDetails(); + requestDetails.setRetry(true); + requestDetails.setMaxRetries(REINDEX_MAX_RETRIES); + + TransactionDetails transactionDetails = new TransactionDetails(); + ReindexTaskV2.JobParameters jp = new ReindexTaskV2.JobParameters(); + jp.setData(data) + .setRequestDetails(requestDetails) + .setTransactionDetails(transactionDetails) + .setDataSink(theDataSink) + .setInstanceId(theInstanceId) + .setChunkId(theChunkId) + .setJobParameters(theJobParameters); + + ReindexTaskV2 reindexJob = new ReindexTaskV2(jp, myDaoRegistry, mySystemDao, myIdHelperService); + + return myHapiTransactionService + .withRequest(requestDetails) + .withTransactionDetails(transactionDetails) + .withRequestPartitionId(data.getRequestPartitionId()) + .execute(reindexJob); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/ReindexTaskV2.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/ReindexTaskV2.java new file mode 100644 index 00000000000..0e77b74f6e6 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/ReindexTaskV2.java @@ -0,0 +1,201 @@ +package ca.uhn.fhir.batch2.jobs.reindex.v2; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexWarningProcessor; +import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.api.dao.ReindexOutcome; +import ca.uhn.fhir.jpa.api.dao.ReindexParameters; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.parser.DataFormatException; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; +import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; +import ca.uhn.fhir.util.StopWatch; +import jakarta.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class ReindexTaskV2 implements TransactionCallback { + private static final Logger ourLog = LoggerFactory.getLogger(ReindexTaskV2.class); + + public static class JobParameters { + private ResourceIdListWorkChunkJson myData; + private RequestDetails myRequestDetails; + private TransactionDetails myTransactionDetails; + private IJobDataSink myDataSink; + private String myChunkId; + private String myInstanceId; + private ReindexJobParameters myJobParameters; + + public ResourceIdListWorkChunkJson getData() { + return myData; + } + + public JobParameters setData(ResourceIdListWorkChunkJson theData) { + myData = theData; + return this; + } + + public RequestDetails getRequestDetails() { + return myRequestDetails; + } + + public JobParameters setRequestDetails(RequestDetails theRequestDetails) { + myRequestDetails = theRequestDetails; + return this; + } + + public TransactionDetails getTransactionDetails() { + return myTransactionDetails; + } + + public JobParameters setTransactionDetails(TransactionDetails theTransactionDetails) { + myTransactionDetails = theTransactionDetails; + return this; + } + + public IJobDataSink getDataSink() { + return myDataSink; + } + + public JobParameters setDataSink(IJobDataSink theDataSink) { + myDataSink = theDataSink; + return this; + } + + public String getChunkId() { + return myChunkId; + } + + public JobParameters setChunkId(String theChunkId) { + myChunkId = theChunkId; + return this; + } + + public String getInstanceId() { + return myInstanceId; + } + + public JobParameters setInstanceId(String theInstanceId) { + myInstanceId = theInstanceId; + return this; + } + + public ReindexJobParameters getJobParameters() { + return myJobParameters; + } + + public JobParameters setJobParameters(ReindexJobParameters theJobParameters) { + myJobParameters = theJobParameters; + return this; + } + } + + private final DaoRegistry myDaoRegistry; + private final IFhirSystemDao mySystemDao; + + private final IIdHelperService> myIdHelperService; + + private final ResourceIdListWorkChunkJson myData; + private final RequestDetails myRequestDetails; + private final TransactionDetails myTransactionDetails; + private final IJobDataSink myDataSink; + private final String myChunkId; + private final String myInstanceId; + private final ReindexJobParameters myJobParameters; + + public ReindexTaskV2( + JobParameters theJobParameters, + DaoRegistry theRegistry, + IFhirSystemDao theSystemDao, + IIdHelperService> theIdHelperService) { + myDaoRegistry = theRegistry; + mySystemDao = theSystemDao; + myIdHelperService = theIdHelperService; + + myData = theJobParameters.getData(); + myRequestDetails = theJobParameters.getRequestDetails(); + myTransactionDetails = theJobParameters.getTransactionDetails(); + myDataSink = theJobParameters.getDataSink(); + myInstanceId = theJobParameters.getInstanceId(); + myChunkId = theJobParameters.getChunkId(); + myJobParameters = theJobParameters.getJobParameters(); + myDataSink.setWarningProcessor(new ReindexWarningProcessor()); + } + + @Override + public ReindexResults doInTransaction(@Nonnull TransactionStatus theStatus) { + List> persistentIds = myData.getResourcePersistentIds(myIdHelperService); + + ourLog.info( + "Starting reindex work chunk with {} resources - Instance[{}] Chunk[{}]", + persistentIds.size(), + myInstanceId, + myChunkId); + StopWatch sw = new StopWatch(); + ReindexResults reindexResults = new ReindexResults(); + + // Prefetch Resources from DB + boolean reindexSearchParameters = + myJobParameters.getReindexSearchParameters() != ReindexParameters.ReindexSearchParametersEnum.NONE; + mySystemDao.preFetchResources(persistentIds, reindexSearchParameters); + ourLog.info( + "Prefetched {} resources in {} - Instance[{}] Chunk[{}]", + persistentIds.size(), + sw, + myInstanceId, + myChunkId); + + ReindexParameters parameters = new ReindexParameters() + .setReindexSearchParameters(myJobParameters.getReindexSearchParameters()) + .setOptimizeStorage(myJobParameters.getOptimizeStorage()) + .setOptimisticLock(myJobParameters.getOptimisticLock()); + + // Reindex + + sw.restart(); + for (int i = 0; i < myData.size(); i++) { + + String nextResourceType = myData.getResourceType(i); + IFhirResourceDao dao = myDaoRegistry.getResourceDao(nextResourceType); + IResourcePersistentId resourcePersistentId = persistentIds.get(i); + try { + + ReindexOutcome outcome = + dao.reindex(resourcePersistentId, parameters, myRequestDetails, myTransactionDetails); + + outcome.getWarnings().forEach(myDataSink::recoveredError); + reindexResults.addResourceTypeToCompletionStatus(nextResourceType, outcome.isHasPendingWork()); + + } catch (BaseServerResponseException | DataFormatException e) { + String resourceForcedId = myIdHelperService + .translatePidIdToForcedIdWithCache(resourcePersistentId) + .orElse(resourcePersistentId.toString()); + String resourceId = nextResourceType + "/" + resourceForcedId; + ourLog.error("Failure during reindexing {}", resourceId, e); + myDataSink.recoveredError("Failure reindexing " + resourceId + ": " + e.getMessage()); + } + } + + ourLog.info( + "Finished reindexing {} resources in {} - {}/sec - Instance[{}] Chunk[{}]", + persistentIds.size(), + sw, + sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS), + myInstanceId, + myChunkId); + + return reindexResults; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/ReindexV2Config.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/ReindexV2Config.java new file mode 100644 index 00000000000..6cc4553c64b --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/v2/ReindexV2Config.java @@ -0,0 +1,110 @@ +package ca.uhn.fhir.batch2.jobs.reindex.v2; + +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; +import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; +import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; +import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults; +import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService; +import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep; +import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep; +import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import ca.uhn.fhir.rest.server.provider.ProviderConstants; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX; + +@Configuration +public class ReindexV2Config { + + @Autowired + private ReindexJobService myReindexJobService; + + @Autowired + private HapiTransactionService myHapiTransactionService; + + @Autowired + private IFhirSystemDao mySystemDao; + + @Autowired + private DaoRegistry myRegistry; + + @Autowired + private IIdHelperService> myIdHelperService; + + @Autowired + @Qualifier("reindexGenerateRangeChunkStepV2") + private IJobStepWorker myReindexGenerateRangeChunkStep; + + @Autowired + @Qualifier("reindexLoadIdsStepV2") + private IJobStepWorker myReindexLoadIdsStep; + + @Autowired + private ReindexJobParametersValidatorV2 myReindexJobParametersValidator; + + // Version 2 + @Bean + public JobDefinition reindexJobDefinitionV2() { + return JobDefinition.newBuilder() + .setJobDefinitionId(JOB_REINDEX) + .setJobDescription("Reindex resources") + .setJobDefinitionVersion(2) + .setParametersType(ReindexJobParameters.class) + .setParametersValidator(myReindexJobParametersValidator) + .gatedExecution() + .addFirstStep( + "generate-ranges", + "Generate data ranges to reindex", + ChunkRangeJson.class, + myReindexGenerateRangeChunkStep) + .addIntermediateStep( + "load-ids", + "Load IDs of resources to reindex", + ResourceIdListWorkChunkJson.class, + myReindexLoadIdsStep) + .addIntermediateStep( + "reindex-start", "Perform the resource reindex", ReindexResults.class, reindexStepV2()) + .addLastStep("reindex-pending-work", "Waits for reindex work to complete.", pendingWorkStep()) + .build(); + } + + @Bean + public CheckPendingReindexWorkStep pendingWorkStep() { + return new CheckPendingReindexWorkStep(myReindexJobService); + } + + @Bean + public ReindexStepV2 reindexStepV2() { + return new ReindexStepV2( + myReindexJobService, myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService); + } + + @Bean("reindexGenerateRangeChunkStepV2") + public IJobStepWorker reindexGenerateRangeChunksStep() { + return new GenerateRangeChunksStep<>(); + } + + @Bean("reindexLoadIdsStepV2") + public IJobStepWorker reindexLoadIdsStep( + IBatch2DaoSvc theBatch2DaoSvc) { + return new LoadIdsStep<>(theBatch2DaoSvc); + } + + @Bean + public ReindexJobParametersValidatorV2 reindexJobParametersValidatorV2(IBatch2DaoSvc theBatch2DaoSvc) { + return new ReindexJobParametersValidatorV2( + new UrlListValidator(ProviderConstants.OPERATION_REINDEX, theBatch2DaoSvc)); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexJobParametersValidatorTest.java b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexTaskParametersValidatorTest.java similarity index 86% rename from hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexJobParametersValidatorTest.java rename to hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexTaskParametersValidatorTest.java index 0fe1c00e461..f03db306889 100644 --- a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexJobParametersValidatorTest.java +++ b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexTaskParametersValidatorTest.java @@ -1,6 +1,7 @@ package ca.uhn.fhir.batch2.jobs.reindex; import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator; +import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexJobParametersValidatorV1; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -13,13 +14,13 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @ExtendWith(MockitoExtension.class) -public class ReindexJobParametersValidatorTest { +public class ReindexTaskParametersValidatorTest { @Mock private UrlListValidator myListValidator; @InjectMocks - private ReindexJobParametersValidator myValidator; + private ReindexJobParametersValidatorV1 myValidator; @ParameterizedTest @ValueSource(strings = { "\n", " ", "\t" }) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/RetryChunkLaterException.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/RetryChunkLaterException.java index ec35cb770d2..0d132fc7191 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/RetryChunkLaterException.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/RetryChunkLaterException.java @@ -38,11 +38,18 @@ public class RetryChunkLaterException extends RuntimeException { private final Duration myNextPollDuration; public RetryChunkLaterException() { - this(ONE_MINUTE); + this("", ONE_MINUTE); } + /** + * For HAPI exceptions, use {@link RetryChunkLaterException#RetryChunkLaterException(String, Duration)} + */ public RetryChunkLaterException(Duration theDuration) { - super(); + this("", theDuration); + } + + public RetryChunkLaterException(String theCode, Duration theDuration) { + super(theCode); this.myNextPollDuration = theDuration; } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/ResourceIdListWorkChunkJson.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/ResourceIdListWorkChunkJson.java index b7969f1ae1c..0ecacb9bdf6 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/ResourceIdListWorkChunkJson.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/ResourceIdListWorkChunkJson.java @@ -62,7 +62,7 @@ public class ResourceIdListWorkChunkJson implements IModelJson { return myRequestPartitionId; } - private List getTypedPids() { + public List getTypedPids() { if (myTypedPids == null) { myTypedPids = new ArrayList<>(); } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirResourceDao.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirResourceDao.java index 7082fed8240..d918f5572ce 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirResourceDao.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirResourceDao.java @@ -26,6 +26,7 @@ import ca.uhn.fhir.jpa.api.model.DeleteConflictList; import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome; import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.api.model.ExpungeOutcome; +import ca.uhn.fhir.jpa.api.model.ReindexJobStatus; import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource; import ca.uhn.fhir.jpa.model.entity.TagTypeEnum; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; @@ -315,12 +316,24 @@ public interface IFhirResourceDao extends IDao { * @param theResourcePersistentId The ID * @return */ + @SuppressWarnings("rawtypes") ReindexOutcome reindex( IResourcePersistentId theResourcePersistentId, ReindexParameters theReindexParameters, RequestDetails theRequest, TransactionDetails theTransactionDetails); + /** + * Returns ReindexJobStatus information object that tells the caller + * if a reindex job is still in progress or not. + * + * If the implementing DAO requires additional work during reindexing, + * this is the method to override. + */ + default ReindexJobStatus getReindexJobStatus() { + return ReindexJobStatus.NO_WORK_NEEDED; + } + void removeTag( IIdType theId, TagTypeEnum theTagType, String theSystem, String theCode, RequestDetails theRequestDetails); diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/ReindexOutcome.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/ReindexOutcome.java index 8c7eb7fa5b9..744e1def01f 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/ReindexOutcome.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/ReindexOutcome.java @@ -29,6 +29,11 @@ public class ReindexOutcome { private List myWarnings; + /** + * True if there is additional (async) work to wait on. + */ + private boolean myHasPendingWork; + public List getWarnings() { return defaultIfNull(myWarnings, Collections.emptyList()); } @@ -39,4 +44,12 @@ public class ReindexOutcome { } myWarnings.add(theWarning); } + + public boolean isHasPendingWork() { + return myHasPendingWork; + } + + public void setHasPendingWork(boolean theHasPendingWork) { + myHasPendingWork = theHasPendingWork; + } } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/ReindexJobStatus.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/ReindexJobStatus.java new file mode 100644 index 00000000000..227756c5e44 --- /dev/null +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/ReindexJobStatus.java @@ -0,0 +1,16 @@ +package ca.uhn.fhir.jpa.api.model; + +public class ReindexJobStatus { + + public static ReindexJobStatus NO_WORK_NEEDED = new ReindexJobStatus(); + + private boolean myHasReindexWorkPending; + + public boolean isHasReindexWorkPending() { + return myHasReindexWorkPending; + } + + public void setHasReindexWorkPending(boolean theHasReindexWorkPending) { + myHasReindexWorkPending = theHasReindexWorkPending; + } +}