From bb5d91ac373608144886cac1123975a425de0c93 Mon Sep 17 00:00:00 2001 From: leif stawnyczy Date: Thu, 26 Sep 2024 16:02:46 -0400 Subject: [PATCH] adding a new version of reindex batch job --- .../jpa/dao/JpaResourceDaoCodeSystem.java | 45 + .../jpa/term/TermDeferredStorageSvcImpl.java | 1 + ...esourceDaoR4IndexStorageOptimizedTest.java | 4 +- .../r4/FhirResourceDaoR4QueryCountTest.java | 32 +- ...exStepTest.java => ReindexStepV1Test.java} | 8 +- ...exStepTest.java => ReindexStepV1Test.java} | 81 +- ...indexJobTest.java => ReindexTaskTest.java} | 2 +- ...a => ReindexTaskWithPartitioningTest.java} | 2 +- .../jpa/term/ValueSetExpansionR4Test.java | 805 ++++-------------- .../ca/uhn/fhir/jpa/test/BaseJpaR4Test.java | 3 + .../batch2/jobs/reindex/BaseReindexStep.java | 69 ++ .../reindex/CheckPendingReindexWorkStep.java | 42 + .../batch2/jobs/reindex/ReindexAppCtx.java | 81 +- .../fhir/batch2/jobs/reindex/ReindexStep.java | 203 ----- .../batch2/jobs/reindex/ReindexStepV1.java | 66 ++ .../batch2/jobs/reindex/ReindexStepV2.java | 65 ++ .../fhir/batch2/jobs/reindex/ReindexTask.java | 201 +++++ .../batch2/jobs/reindex/ReindexUtils.java | 28 + .../jobs/reindex/models/ReindexResults.java | 30 + .../jobs/reindex/svcs/ReindexJobService.java | 38 + ...> ReindexTaskParametersValidatorTest.java} | 2 +- .../chunk/ResourceIdListWorkChunkJson.java | 2 +- .../fhir/jpa/api/dao/IFhirResourceDao.java | 13 + .../uhn/fhir/jpa/api/dao/ReindexOutcome.java | 13 + .../fhir/jpa/api/model/ReindexJobStatus.java | 16 + 25 files changed, 941 insertions(+), 911 deletions(-) rename hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/{ReindexStepTest.java => ReindexStepV1Test.java} (92%) rename hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/{ReindexStepTest.java => ReindexStepV1Test.java} (80%) rename hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/{ReindexJobTest.java => ReindexTaskTest.java} (99%) rename hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/{ReindexJobWithPartitioningTest.java => ReindexTaskWithPartitioningTest.java} (99%) create mode 100644 hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/BaseReindexStep.java create mode 100644 hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/CheckPendingReindexWorkStep.java delete mode 100644 hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java create mode 100644 hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStepV1.java create mode 100644 hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStepV2.java create mode 100644 hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexTask.java create mode 100644 hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexUtils.java create mode 100644 hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/models/ReindexResults.java create mode 100644 hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/svcs/ReindexJobService.java rename hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/{ReindexJobParametersValidatorTest.java => ReindexTaskParametersValidatorTest.java} (95%) create mode 100644 hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/ReindexJobStatus.java diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/JpaResourceDaoCodeSystem.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/JpaResourceDaoCodeSystem.java index 5e8f6b053bb..5ade0c2937d 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/JpaResourceDaoCodeSystem.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/JpaResourceDaoCodeSystem.java @@ -27,7 +27,11 @@ import ca.uhn.fhir.context.support.IValidationSupport.CodeValidationResult; import ca.uhn.fhir.context.support.LookupCodeRequest; import ca.uhn.fhir.context.support.ValidationSupportContext; import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem; +import ca.uhn.fhir.jpa.api.dao.ReindexOutcome; +import ca.uhn.fhir.jpa.api.dao.ReindexParameters; +import ca.uhn.fhir.jpa.api.model.ReindexJobStatus; import ca.uhn.fhir.jpa.api.svc.IIdHelperService; import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource; import ca.uhn.fhir.jpa.model.entity.ResourceTable; @@ -176,6 +180,47 @@ public class JpaResourceDaoCodeSystem extends BaseHapiF myTermDeferredStorageSvc.deleteCodeSystemForResource(theEntityToDelete); } + /** + * If there are more code systems to process + * than {@link JpaStorageSettings#getDeferIndexingForCodesystemsOfSize()}, + * then these codes will have their processing deferred (for a later time). + * + * This can result in future reindex steps *skipping* these code systems (if + * they're still deferred) and thus incorrect expansions resulting. + * + * So we override the reindex method for CodeSystems specifically to + * force reindex batch jobs to wait until all code systems are processed before + * moving on. + */ + @SuppressWarnings("rawtypes") + @Override + public ReindexOutcome reindex( + IResourcePersistentId thePid, + ReindexParameters theReindexParameters, + RequestDetails theRequest, + TransactionDetails theTransactionDetails) { + ReindexOutcome outcome = super.reindex(thePid, theReindexParameters, theRequest, theTransactionDetails); + + if (outcome.getWarnings().isEmpty()) { + outcome.setHasPendingWork(true); + } + return outcome; + } + + @Override + public ReindexJobStatus getReindexJobStatus() { + boolean isQueueEmpty = myTermDeferredStorageSvc.isStorageQueueEmpty(true); + + ReindexJobStatus status = new ReindexJobStatus(); + status.setHasReindexWorkPending(!isQueueEmpty); + if (status.isHasReindexWorkPending()) { + // force a run + myTermDeferredStorageSvc.saveDeferred(); + } + + return status; + } + @Override public ResourceTable updateEntity( RequestDetails theRequest, diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java index 408f85c1cd3..6613284d097 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/TermDeferredStorageSvcImpl.java @@ -23,6 +23,7 @@ import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.batch2.model.StatusEnum; +import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao; import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao; diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4IndexStorageOptimizedTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4IndexStorageOptimizedTest.java index d94b0089337..6523bdd1373 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4IndexStorageOptimizedTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4IndexStorageOptimizedTest.java @@ -23,7 +23,7 @@ import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamUri; import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.jpa.model.util.SearchParamHash; import ca.uhn.fhir.jpa.model.util.UcumServiceUtil; -import ca.uhn.fhir.jpa.reindex.ReindexStepTest; +import ca.uhn.fhir.jpa.reindex.ReindexStepV1Test; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.test.BaseJpaR4Test; import ca.uhn.fhir.rest.param.BaseParam; @@ -321,7 +321,7 @@ public class FhirResourceDaoR4IndexStorageOptimizedTest extends BaseJpaR4Test { // Additional existing tests with enabled IndexStorageOptimized @Nested - public class IndexStorageOptimizedReindexStepTest extends ReindexStepTest { + public class IndexStorageOptimizedReindexStepTestV1 extends ReindexStepV1Test { @BeforeEach void setUp() { myStorageSettings.setIndexStorageOptimized(true); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java index 52735147cd9..0ff0fbe6ca0 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java @@ -2,12 +2,15 @@ package ca.uhn.fhir.jpa.dao.r4; import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson; import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeStep; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexStepV1; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep; +import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.support.ValidationSupportContext; import ca.uhn.fhir.context.support.ValueSetExpansionOptions; @@ -18,13 +21,13 @@ import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome; import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum; import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao; -import ca.uhn.fhir.jpa.reindex.ReindexTestHelper; import ca.uhn.fhir.jpa.entity.TermValueSet; import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum; import ca.uhn.fhir.jpa.interceptor.ForceOffsetSearchModeInterceptor; import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test; +import ca.uhn.fhir.jpa.reindex.ReindexTestHelper; import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc; @@ -146,7 +149,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test @Autowired private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc; @Autowired - private ReindexStep myReindexStep; + private ReindexStepV1 myReindexStepV1; @Autowired private DeleteExpungeStep myDeleteExpungeStep; @Autowired @@ -1018,7 +1021,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test // insert to: HFJ_RESOURCE, HFJ_RES_VER, HFJ_RES_LINK assertEquals(4, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); - } @ParameterizedTest @@ -1031,7 +1033,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test }) public void testReindexJob_OptimizeStorage(boolean theOptimisticLock, ReindexParameters.OptimizeStorageModeEnum theOptimizeStorageModeEnum, int theExpectedSelectCount, int theExpectedUpdateCount) { // Setup - ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(); IIdType patientId = createPatient(withActiveTrue()); IIdType orgId = createOrganization(withName("MY ORG")); @@ -1056,7 +1057,14 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test // execute myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, mock(IJobDataSink.class), "123", "456", params); + JobInstance instance = new JobInstance(); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + mock(WorkChunk.class) + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, mock(IJobDataSink.class)); // validate assertThat(myCaptureQueriesListener.getSelectQueriesForCurrentThread()).hasSize(theExpectedSelectCount); @@ -1064,7 +1072,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test assertThat(myCaptureQueriesListener.getInsertQueriesForCurrentThread()).isEmpty(); assertThat(myCaptureQueriesListener.getDeleteQueriesForCurrentThread()).isEmpty(); assertEquals(10, outcome.getRecordsProcessed()); - } @Test @@ -1095,7 +1102,14 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test // execute myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, mock(IJobDataSink.class), "123", "456", params); + JobInstance instance = new JobInstance(); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + mock(WorkChunk.class) + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, mock(IJobDataSink.class)); assertEquals(20, outcome.getRecordsProcessed()); // validate @@ -1103,10 +1117,8 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test assertEquals(0, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size()); assertEquals(0, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size()); assertEquals(0, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size()); - } - public void assertNoPartitionSelectors() { List selectQueries = myCaptureQueriesListener.getSelectQueriesForCurrentThread(); for (SqlQuery next : selectQueries) { diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepV1Test.java similarity index 92% rename from hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepTest.java rename to hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepV1Test.java index 69ed1e1507c..95d2920706b 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ReindexStepV1Test.java @@ -6,7 +6,7 @@ import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexStepV1; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -26,7 +26,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -public class ReindexStepTest { +public class ReindexStepV1Test { @Mock private HapiTransactionService myHapiTransactionService; @@ -34,7 +34,7 @@ public class ReindexStepTest { private IJobDataSink myDataSink; @InjectMocks - private ReindexStep myReindexStep; + private ReindexStepV1 myReindexStepV1; @Captor private ArgumentCaptor builderArgumentCaptor; @@ -51,7 +51,7 @@ public class ReindexStepTest { when(myHapiTransactionService.buildExecutionBuilder(any())).thenCallRealMethod(); // when - myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", reindexJobParameters); + myReindexStepV1.doReindex(data, myDataSink, "index-id", "chunk-id", reindexJobParameters); // then assertMethodArgumentRequestPartitionId(expectedPartitionId); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepV1Test.java similarity index 80% rename from hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepTest.java rename to hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepV1Test.java index d7ed24e6789..13e58a760da 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepV1Test.java @@ -2,10 +2,13 @@ package ca.uhn.fhir.jpa.reindex; import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; -import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexStepV1; +import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.test.BaseJpaR4Test; @@ -25,13 +28,14 @@ import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXED; import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -public class ReindexStepTest extends BaseJpaR4Test { +public class ReindexStepV1Test extends BaseJpaR4Test { @Autowired - private ReindexStep myReindexStep; + private ReindexStepV1 myReindexStepV1; @Mock private IJobDataSink myDataSink; @@ -46,9 +50,7 @@ public class ReindexStepTest extends BaseJpaR4Test { @Test public void testReindex_NoActionNeeded() { - // Setup - Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong(); Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong(); @@ -57,9 +59,19 @@ public class ReindexStepTest extends BaseJpaR4Test { data.addTypedPid("Patient", id1); // Execute - + ReindexJobParameters params = new ReindexJobParameters(); myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); + JobInstance instance = new JobInstance(); + instance.setInstanceId("index-id"); + WorkChunk chunk = new WorkChunk(); + chunk.setId("chunk-id"); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + chunk + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink); // Verify assertEquals(2, outcome.getRecordsProcessed()); @@ -72,12 +84,9 @@ public class ReindexStepTest extends BaseJpaR4Test { assertEquals(0, myCaptureQueriesListener.getRollbackCount()); } - @Test public void testReindex_NoActionNeeded_IndexMissingFieldsEnabled() { - // Setup - myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED); Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong(); @@ -88,9 +97,16 @@ public class ReindexStepTest extends BaseJpaR4Test { data.addTypedPid("Patient", id1); // Execute - + ReindexJobParameters params = new ReindexJobParameters(); myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); + JobInstance instance = new JobInstance(); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + mock(WorkChunk.class) + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink); // Verify assertEquals(2, outcome.getRecordsProcessed()); @@ -121,9 +137,16 @@ public class ReindexStepTest extends BaseJpaR4Test { }); // Execute - + ReindexJobParameters params = new ReindexJobParameters(); myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); + JobInstance instance = new JobInstance(); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + mock(WorkChunk.class) + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink); // Verify assertEquals(2, outcome.getRecordsProcessed()); @@ -136,12 +159,9 @@ public class ReindexStepTest extends BaseJpaR4Test { assertEquals(0, myCaptureQueriesListener.getRollbackCount()); } - @Test public void testReindex_IndexesAddedAndRemoved_IndexMissingFieldsEnabled() { - // Setup - myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED); boolean markResourcesForReindexingUponSearchParameterChange = myStorageSettings.isMarkResourcesForReindexingUponSearchParameterChange(); myStorageSettings.setMarkResourcesForReindexingUponSearchParameterChange(false); // if this were true, it would set up a lot of reindex jobs extraneous to the one we're trying to test @@ -189,9 +209,16 @@ public class ReindexStepTest extends BaseJpaR4Test { mySearchParamRegistry.forceRefresh(); // Execute - + ReindexJobParameters params = new ReindexJobParameters(); myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); + JobInstance instance = new JobInstance(); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + mock(WorkChunk.class) + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink); // Verify assertEquals(2, outcome.getRecordsProcessed()); @@ -207,9 +234,7 @@ public class ReindexStepTest extends BaseJpaR4Test { @Test public void testReindex_OneResourceReindexFailedButOthersSucceeded() { - // Setup - Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong(); Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong(); Long idPatientToInvalidate = createPatient().getIdPartAsLong(); @@ -234,9 +259,19 @@ public class ReindexStepTest extends BaseJpaR4Test { }); // Execute - + ReindexJobParameters params = new ReindexJobParameters(); myCaptureQueriesListener.clear(); - RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); + JobInstance instance = new JobInstance(); + instance.setInstanceId("index-id"); + WorkChunk workChunk = new WorkChunk(); + workChunk.setId("workid"); + StepExecutionDetails stepExecutionDetails = new StepExecutionDetails<>( + params, + data, + instance, + workChunk + ); + RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink); // Verify assertEquals(4, outcome.getRecordsProcessed()); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexJobTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexTaskTest.java similarity index 99% rename from hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexJobTest.java rename to hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexTaskTest.java index fa9d93fb4ab..d78e456092c 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexJobTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexTaskTest.java @@ -47,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @SuppressWarnings("SqlDialectInspection") -public class ReindexJobTest extends BaseJpaR4Test { +public class ReindexTaskTest extends BaseJpaR4Test { @Autowired private IJobCoordinator myJobCoordinator; diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexJobWithPartitioningTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexTaskWithPartitioningTest.java similarity index 99% rename from hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexJobWithPartitioningTest.java rename to hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexTaskWithPartitioningTest.java index 8a643006439..4ffabc3dafc 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexJobWithPartitioningTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexTaskWithPartitioningTest.java @@ -27,7 +27,7 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class ReindexJobWithPartitioningTest extends BaseJpaR4Test { +public class ReindexTaskWithPartitioningTest extends BaseJpaR4Test { @Autowired private IJobCoordinator myJobCoordinator; diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4Test.java index c3d3a1ade89..9ea43c57dbb 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4Test.java @@ -1,8 +1,8 @@ package ca.uhn.fhir.jpa.term; -import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; +import ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.RuntimeSearchParam; @@ -36,7 +36,6 @@ import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; -import ca.uhn.fhir.util.BundleBuilder; import com.google.common.collect.Lists; import jakarta.annotation.Nonnull; import org.hl7.fhir.instance.model.api.IBaseResource; @@ -54,22 +53,19 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Pageable; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.HashSet; import java.util.List; import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_DELETE_JOB_NAME; -import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME; import static ca.uhn.fhir.util.HapiExtensions.EXT_VALUESET_EXPANSION_MESSAGE; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -2099,659 +2095,156 @@ public class ValueSetExpansionR4Test extends BaseTermR4Test implements IValueSet } @Test - public void expandValuesetAfterReindex() { + public void reindexCodeSystems_withDeferredCodeSystems_reindexesAllCodeSystems() { // setup - IParser parser = myFhirContext.newJsonParser(); - CodeSystem cs; - ValueSet vs; - String csStr; - { - String vsStr = """ + int deferredIndexingDefault = myStorageSettings.getDeferIndexingForCodesystemsOfSize(); + + try { + // deferred count must be less than the number of concepts on the + // CodeSystem we will upload + myStorageSettings.setDeferIndexingForCodesystemsOfSize(3); + ReindexUtils.setRetryDelay(Duration.of(500, ChronoUnit.MILLIS)); + + IParser parser = myFhirContext.newJsonParser(); + + RequestDetails rq = new SystemRequestDetails(); + CodeSystem cs; + ValueSet vs; + String csStr; + { + String vsStr = """ + { + "resourceType": "ValueSet", + "id": "0447bffa-01fa-4405-828a-96192e74a5d8", + "meta": { + "versionId": "2", + "lastUpdated": "2024-04-09T15:06:24.025+00:00", + "source": "#f4491e490a6a2900" + }, + "url": "https://health.gov.on.ca/idms/fhir/ValueSet/IDMS-Submission-Types", + "version": "1.0.0", + "name": "IDMS-SUBMISSION-TYPES", + "title": "IDMS Submission Types", + "status": "active", + "experimental": false, + "date": "2023-09-28", + "publisher": "IDMS", + "description": "List of Submission Types", + "compose": { + "include": [ + { + "system": "https://health.gov.on.ca/idms/fhir/CodeSystem/Internal-Submission-Types" + } + ] + } + } + """; + vs = parser.parseResource(ValueSet.class, vsStr); + csStr = """ { - "resourceType": "ValueSet", - "id": "0447bffa-01fa-4405-828a-96192e74a5d8", - "meta": { - "versionId": "2", - "lastUpdated": "2024-04-09T15:06:24.025+00:00", - "source": "#f4491e490a6a2900" - }, - "url": "https://health.gov.on.ca/idms/fhir/ValueSet/IDMS-Submission-Types", - "version": "1.0.0", - "name": "IDMS-SUBMISSION-TYPES", - "title": "IDMS Submission Types", - "status": "active", - "experimental": false, - "date": "2023-09-28", - "publisher": "IDMS", - "description": "List of Submission Types", - "compose": { - "include": [ - { - "system": "https://health.gov.on.ca/idms/fhir/CodeSystem/Internal-Submission-Types" - } - ] - } - } - """; - vs = parser.parseResource(ValueSet.class, vsStr); - csStr = """ - { - "resourceType": "CodeSystem", - "id": "d9acd5b8-9533-4fa1-bb70-b4380957a8c3", - "meta": { - "versionId": "14", - "lastUpdated": "2024-06-03T17:49:56.580+00:00", - "source": "#261a82258b0978a8" - }, - "url": "https://health.gov.on.ca/idms/fhir/CodeSystem/Internal-Submission-Types", - "version": "1.0.0", - "name": "IDMS-Internal-Submission-Types", - "status": "active", - "date": "2023-09-07", - "publisher": "IDMS", - "description": "This contains a lists of codes Submission Type Codes.", - "content": "complete", - "concept": [ - { - "code": "SUB-BRAND-PRODUCT", - "display": "New Brand Product (Non-New Chemical Entity)" - }, - { - "code": "SUB-CLASSIFICATION", - "display": "Classification Change" - }, - { - "code": "SUB-CLINICIAN-LED-SUBMISSIONS", - "display": "Clinician-Led Submissions" - }, - { - "code": "SUB-DELISTING", - "display": "Delisting" - }, - { - "code": "SUB-DIN-CHANGE", - "display": "Drug Identification Number (DIN) Change" - }, - { - "code": "SUB-DISTRIBUTOR", - "display": "Distributor Change" - }, - { - "code": "SUB-DRUG-PRODUCT-NAME", - "display": "Drug Product Name Change" - }, - { - "code": "SUB-FORMULATION", - "display": "Formulation Change" - }, - { - "code": "SUB-GENERIC-LINE", - "display": "Generic Line Extension" - }, - { - "code": "SUB-GENERIC-PRODUCT", - "display": "New Generic Product" - }, - { - "code": "SUB-LINE-EXTENSION", - "display": "Line Extension" - }, - { - "code": "SUB-NEW-INDICATION", - "display": "New Indication" - }, - { - "code": "SUB-NEW-NATURAL", - "display": "New Natural Health Product" - }, - { - "code": "SUB-NEW-OVER-COUNTER", - "display": "New Over the Counter (OTC) Product" - }, - { - "code": "SUB-NEW-CHEMICAL", - "display": "New Chemical Entity" - }, - { - "code": "SUB-NEW-NUTRITIONAL", - "display": "New Nutrition Product" - }, - { - "code": "SUB-NUTRITIONAL-LINE", - "display": "Line Extension" - }, - { - "code": "SUB-NEW-BIOSIMILAR", - "display": "New Biosimilar" - }, - { - "code": "SUB-BIOSIMILAR-LINE", - "display": "Line Extension" - }, - { - "code": "SUB-NEW-DIABETIC", - "display": "New Diabetic Testing Agent (DTA)" - }, - { - "code": "SUB-NEW-GLUCOUSE", - "display": "New Flash Glucose Monitoring (FGM) Product" - }, - { - "code": "SUB-NON-FORMULARY-SINGLE", - "display": "Non-Formulary Single Source" - }, - { - "code": "SUB-NON-FORMULARY-MULTIPLE", - "display": "Non-Formulary Multiple Source " - }, - { - "code": "SUB-ORG-NAME", - "display": "Organization Name Change" - }, - { - "code": "SUB-OTHER-TYPE-IN", - "display": "Other" - }, - { - "code": "SUB-OWNERSHIP", - "display": "Ownership Change" - }, - { - "code": "SUB-PRICE-CHANGE", - "display": "Price Change" - }, - { - "code": "SUB-PRODUCT-DISCONTINUE", - "display": "Product Discontinuation" - }, - { - "code": "SUB-PRODUCT-MONOGRAPH", - "display": "Product Monograph Change" - }, - { - "code": "SUB-PRODUCT-NAME", - "display": "Product Name Change" - }, - { - "code": "SUB-PRODUCT-CHANGE", - "display": "Product Change" - }, - { - "code": "SUB-SPECIAL-PROJECTS", - "display": "Special Projects" - }, - { - "code": "SUB-TEMPORARY-BENEFITS", - "display": "Temporary Benefits" - }, - { - "code": "SUB-VALVED", - "display": "New Valved Holding Chamber (VHC)" - }, - { - "code": "SUB-REDESIGNATION-BENEFITS", - "display": "Re-Designation of Benefits" - }, - { - "code": "SUB-MANUFACTURER-INQUIRIES", - "display": "Manufacturer Inquiries" - }, - { - "code": "SUB-CHALLENGES", - "display": "Challenges" - }, - { - "code": "SUB-NEW-LINE", - "display": "Line Extension" - }, - { - "code": "SUB-MULTI-DIN-CHANGE", - "display": "Drug Identification Number (DIN) Change" - }, - { - "code": "SUB-MULTI-DRUG-PRODUCT-NAME", - "display": "Drug Product Name Change" - }, - { - "code": "SUB-MULTI-PRODUCT-MONOGRAPH", - "display": "Product Monograph Change" - }, - { - "code": "SUB-MULTI-FORMULATION", - "display": "Formulation Change" - }, - { - "code": "SUB-MULTI-OWNERSHIP", - "display": "Ownership Change" - }, - { - "code": "SUB-MULTI-DISTRIBUTOR", - "display": "Distributor Change" - }, - { - "code": "SUB-MULTI-ORG-NAME", - "display": "Organization Name Change" - }, - { - "code": "SUB-MULTI-PRODUCT-DISCONTINUE", - "display": "Product Discontinuation" - }, - { - "code": "SUB-MULTI-DELISTING", - "display": "Delisting" - }, - { - "code": "SUB-MULTI-PRICE-CHANGE", - "display": "Price Change" - }, - { - "code": "SUB-MULTI-OTHER-TYPE-IN", - "display": "Other" - }, - { - "code": "SUB-BIO-DIN-CHANGE", - "display": "Drug Identification Number (DIN) Change" - }, - { - "code": "SUB-BIO-DRUG-PRODUCT-NAME", - "display": "Drug Product Name Change" - }, - { - "code": "SUB-BIO-PRODUCT-MONOGRAPH", - "display": "Product Monograph Change" - }, - { - "code": "SUB-BIO-FORMULATION", - "display": "Formulation Change" - }, - { - "code": "SUB-BIO-OWNERSHIP", - "display": "Ownership Change" - }, - { - "code": "SUB-BIO-DISTRIBUTOR", - "display": "Distributor Change" - }, - { - "code": "SUB-BIO-ORG-NAME", - "display": "Organization Name Change" - }, - { - "code": "SUB-BIO-PRODUCT-DISCONTINUE", - "display": "Product Discontinuation" - }, - { - "code": "SUB-BIO-DELISTING", - "display": "Delisting" - }, - { - "code": "SUB-BIO-PRICE-CHANGE", - "display": "Price Change" - }, - { - "code": "SUB-BIO-OTHER-TYPE-IN", - "display": "Other" - }, - { - "code": "SUB-OTC-DIN-CHANGE", - "display": "Drug Identification Number (DIN) Change" - }, - { - "code": "SUB-OTC-DRUG-PRODUCT-NAME", - "display": "Drug Product Name Change" - }, - { - "code": "SUB-OTC-PRODUCT-MONOGRAPH", - "display": "Product Monograph Change" - }, - { - "code": "SUB-OTC-FORMULATION", - "display": "Formulation Change" - }, - { - "code": "SUB-OTC-OWNERSHIP", - "display": "Ownership Change" - }, - { - "code": "SUB-OTC-DISTRIBUTOR", - "display": "Distributor Change" - }, - { - "code": "SUB-OTC-ORG-NAME", - "display": "Organization Name Change" - }, - { - "code": "SUB-OTC-PRODUCT-DISCONTINUE", - "display": "Product Discontinuation" - }, - { - "code": "SUB-OTC-DELISTING", - "display": "Delisting" - }, - { - "code": "SUB-OTC-PRICE-CHANGE", - "display": "Price Change" - }, - { - "code": "SUB-OTC-OTHER-TYPE-IN", - "display": "Other" - }, - { - "code": "SUB-DIABETIC-PRODUCT-NAME", - "display": "Product Name Change" - }, - { - "code": "SUB-DIABETIC-PRODUCT-CHANGE", - "display": "Product Change" - }, - { - "code": "SUB-DIABETIC-OWNERSHIP", - "display": "Ownership Change" - }, - { - "code": "SUB-DIABETIC-DISTRIBUTOR", - "display": "Distributor Change" - }, - { - "code": "SUB-DIABETIC-ORG-NAME", - "display": "Organization Name Change" - }, - { - "code": "SUB-DIABETIC-PRODUCT-DISCONTINUE", - "display": "Product Discontinuation" - }, - { - "code": "SUB-DIABETIC-DELISTING", - "display": "Delisting" - }, - { - "code": "SUB-DIABETIC-PRICE-CHANGE", - "display": "Price Change" - }, - { - "code": "SUB-DIABETIC-OTHER-TYPE-IN", - "display": "Other" - }, - { - "code": "SUB-GLUCOSE-PRODUCT-NAME", - "display": "Product Name Change" - }, - { - "code": "SUB-GLUCOSE-PRODUCT-CHANGE", - "display": "Product Change" - }, - { - "code": "SUB-GLUCOSE-OWNERSHIP", - "display": "Ownership Change" - }, - { - "code": "SUB-GLUCOSE-DISTRIBUTOR", - "display": "Distributor Change" - }, - { - "code": "SUB-GLUCOSE-ORG-NAME", - "display": "Organization Name Change" - }, - { - "code": "SUB-GLUCOSE-PRODUCT-DISCONTINUE", - "display": "Product Discontinuation" - }, - { - "code": "SUB-GLUCOSE-DELISTING", - "display": "Delisting" - }, - { - "code": "SUB-GLUCOSE-PRICE-CHANGE", - "display": "Price Change" - }, - { - "code": "SUB-GLUCOSE-OTHER-TYPE-IN", - "display": "Other" - }, - { - "code": "SUB-VALVED-PRODUCT-NAME", - "display": "Product Name Change" - }, - { - "code": "SUB-VALVED-PRODUCT-CHANGE", - "display": "Product Change" - }, - { - "code": "SUB-VALVED-OWNERSHIP", - "display": "Ownership Change" - }, - { - "code": "SUB-VALVED-DISTRIBUTOR", - "display": "Distributor Change" - }, - { - "code": "SUB-VALVED-ORG-NAME", - "display": "Organization Name Change" - }, - { - "code": "SUB-VALVED-PRODUCT-DISCONTINUE", - "display": "Product Discontinuation" - }, - { - "code": "SUB-VALVED-DELISTING", - "display": "Delisting" - }, - { - "code": "SUB-VALVED-PRICE-CHANGE", - "display": "Price Change" - }, - { - "code": "SUB-VALVED-OTHER-TYPE-IN", - "display": "Other" - }, - { - "code": "SUB-NATURAL-PRODUCT-NAME", - "display": "Product Name Change" - }, - { - "code": "SUB-NATURAL-PRODUCT-CHANGE", - "display": "Product Change" - }, - { - "code": "SUB-NATURAL-OWNERSHIP", - "display": "Ownership Change" - }, - { - "code": "SUB-NATURAL-DISTRIBUTOR", - "display": "Distributor Change" - }, - { - "code": "SUB-NATURAL-ORG-NAME", - "display": "Organization Name Change" - }, - { - "code": "SUB-NATURAL-PRODUCT-DISCONTINUE", - "display": "Product Discontinuation" - }, - { - "code": "SUB-NATURAL-DELISTING", - "display": "Delisting" - }, - { - "code": "SUB-NATURAL-PRICE-CHANGE", - "display": "Price Change" - }, - { - "code": "SUB-NATURAL-OTHER-TYPE-IN", - "display": "Other" - }, - { - "code": "SUB-OTHER-PRODUCT-NAME", - "display": "Product Name Change" - }, - { - "code": "SUB-OTHER-PRODUCT-CHANGE", - "display": "Product Change" - }, - { - "code": "SUB-OTHER-OWNERSHIP", - "display": "Ownership Change" - }, - { - "code": "SUB-OTHER-DISTRIBUTOR", - "display": "Distributor Change" - }, - { - "code": "SUB-OTHER-ORG-NAME", - "display": "Organization Name Change" - }, - { - "code": "SUB-OTHER-PRODUCT-DISCONTINUE", - "display": "Product Discontinuation" - }, - { - "code": "SUB-OTHER-DELISTING", - "display": "Delisting" - }, - { - "code": "SUB-OTHER-PRICE-CHANGE", - "display": "Price Change" - }, - { - "code": "SUB-OTHER-OTHER-TYPE-IN", - "display": "Other" - }, - { - "code": "SUB-NUTRITIONAL-PRODUCT-NAME", - "display": "Product Name Change" - }, - { - "code": "SUB-NUTRITIONAL-FORMULATION", - "display": "Formulation Change" - }, - { - "code": "SUB-NUTRITIONAL-DRUG-REGULATION", - "display": "Change in Food and Drug Regulation Classification" - }, - { - "code": "SUB-ADVERTISING-POLICY", - "display": "Change in Advertising Policy" - }, - { - "code": "SUB-NUTRITIONAL-OWNERSHIP", - "display": "Ownership Change" - }, - { - "code": "SUB-NUTRITIONAL-DISTRIBUTOR", - "display": "Distributor Change" - }, - { - "code": "SUB-NUTRITIONAL-ORG-NAME", - "display": "Organization Name Change" - }, - { - "code": "SUB-NUTRITIONAL-PRODUCT-DISCONTINUE", - "display": "Product Discontinuation" - }, - { - "code": "SUB-NUTRITIONAL-DELISTING", - "display": "Delisting" - }, - { - "code": "SUB-NUTRITIONAL-PRICE-CHANGE", - "display": "Price Change" - }, - { - "code": "SUB-NUTRITIONAL-OTHER-TYPE-IN", - "display": "Other" - } - ] - } - """; - cs = parser.parseResource(CodeSystem.class, csStr); - } - - RequestDetails rq = new SystemRequestDetails(); - int csToCreate = 1; - - // create a bunch of code systems - myValueSetDao.update(vs, rq); - String csUrl = cs.getUrl(); - BundleBuilder builder = new BundleBuilder(myFhirContext); -// while (cs.getConcept().size() > 90) { -// cs.getConcept().remove(1); -// } - for (int i = 0; i < csToCreate; i++) { - if (i > 0) { + "resourceType": "CodeSystem", + "id": "d9acd5b8-9533-4fa1-bb70-b4380957a8c3", + "meta": { + "versionId": "14", + "lastUpdated": "2024-06-03T17:49:56.580+00:00", + "source": "#261a82258b0978a8" + }, + "url": "https://health.gov.on.ca/idms/fhir/CodeSystem/Internal-Submission-Types", + "version": "1.0.0", + "name": "IDMS-Internal-Submission-Types", + "status": "active", + "date": "2023-09-07", + "publisher": "IDMS", + "description": "This contains a lists of codes Submission Type Codes.", + "content": "complete", + "concept": [ + { + "code": "SUB-BRAND-PRODUCT", + "display": "New Brand Product (Non-New Chemical Entity)" + }, + { + "code": "SUB-CLASSIFICATION", + "display": "Classification Change" + }, + { + "code": "SUB-CLINICIAN-LED-SUBMISSIONS", + "display": "Clinician-Led Submissions" + }, + { + "code": "SUB-DELISTING", + "display": "Delisting" + }, + { + "code": "SUB-DIN-CHANGE", + "display": "Drug Identification Number (DIN) Change" + }, + { + "code": "SUB-DISTRIBUTOR", + "display": "Distributor Change" + } + ] + } + """; cs = parser.parseResource(CodeSystem.class, csStr); - cs.setId("cs" + i); - cs.setUrl(csUrl + "-" + i); } - builder.addTransactionUpdateEntry(cs); -// myCodeSystemDao.update(cs, rq); + + // create our ValueSet + myValueSetDao.update(vs, rq); + + // and the code system + myCodeSystemDao.update(cs, rq); + + // sanity check to make sure our code system was actually created + SearchParameterMap spMap = new SearchParameterMap(); + spMap.setLoadSynchronous(true); + IBundleProvider bp = myCodeSystemDao.search(spMap, rq); + assertEquals(1, bp.getAllResources().size()); + IBaseResource baseResource = bp.getAllResources().get(0); + CodeSystem cssaved; + if (baseResource instanceof CodeSystem saved) { + cssaved = saved; + } else { + fail("Should be a code system"); + return; + } + assertEquals(cs.getConcept().size(), cssaved.getConcept().size()); + + // test + // perform the reindex (we'll only target the CodeSystem here) + ReindexJobParameters params = new ReindexJobParameters(); + params.addUrl("CodeSystem?"); + JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); + startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setParameters(params); + + // and wait for it to complete + Batch2JobStartResponse response = myJobCoordinator.startInstance(rq, startRequest); + myBatch2JobHelper.awaitJobCompletion(response); + + // verify by doing the value expansion + ValueSetExpansionOptions options = new ValueSetExpansionOptions(); + options.setCount(200); // this is way more than exist, so it's ok + ValueSet expanded = myValueSetDao.expand(vs, options); + assertNotNull(expanded); + + /* + * If the reindex was performed correctly, the expanded ValueSet + * should contain all the CodeSystems that we originally + * uploaded (and nothing else). + */ + HashSet all = new HashSet<>(); + for (CodeSystem.ConceptDefinitionComponent set : cs.getConcept()) { + all.add(set.getCode()); + } + for (ValueSet.ValueSetExpansionContainsComponent v : expanded.getExpansion().getContains()) { + all.remove(v.getCode()); + } + assertTrue(all.isEmpty(), String.join(", ", all)); + assertEquals(cs.getConcept().size(), expanded.getExpansion().getTotal()); + } finally { + // set back to standard values + myStorageSettings.setDeferIndexingForCodesystemsOfSize(deferredIndexingDefault); + ReindexUtils.setRetryDelay(null); } - builder.setType("transaction"); - Bundle bundle = (Bundle) builder.getBundle(); - mySystemDao.transaction(rq, bundle); - - System.out.println("XXXX " + myDeferredStorageSvc.isStorageQueueEmpty()); - myDeferredStorageSvc.logQueueForUnitTest(); - - - AtomicInteger counter = new AtomicInteger(); - await() - .atMost(10, TimeUnit.SECONDS) - .pollDelay(500, TimeUnit.MILLISECONDS) - .until(() -> { - boolean isQueueEmpty = myDeferredStorageSvc.isStorageQueueEmpty(false); - if (!isQueueEmpty) { - myDeferredStorageSvc.saveAllDeferred(); - } - return isQueueEmpty; - }); - if (myDeferredStorageSvc.isJobsExecuting()) { - System.out.println("running jobs"); - myDeferredStorageSvc.saveAllDeferred(); - myBatch2JobHelper.awaitJobCompletion(TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME); - myBatch2JobHelper.awaitJobCompletion(TERM_CODE_SYSTEM_DELETE_JOB_NAME); - - } - System.out.println("XXX " + myDeferredStorageSvc.isStorageQueueEmpty() + " " + counter.get()); - myDeferredStorageSvc.logQueueForUnitTest(); - myDeferredStorageSvc.toString(); - -// myDeferredStorageSvc.saveAllDeferred(); - - // perform the reindex - ReindexJobParameters params = new ReindexJobParameters(); - params.addUrl("CodeSystem?"); - JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); - startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); - startRequest.setParameters(params); - System.out.println("xxxx starting reindex"); - Batch2JobStartResponse response = myJobCoordinator.startInstance(rq, startRequest); - myBatch2JobHelper.awaitJobCompletion(response); - System.out.println("XXXX reindex done " + myDeferredStorageSvc.isStorageQueueEmpty()); - - // get the codes - SearchParameterMap spMap = new SearchParameterMap(); - spMap.setLoadSynchronous(true); - IBundleProvider bp = myCodeSystemDao.search(spMap, rq); - - // do value expansion - ValueSetExpansionOptions options = new ValueSetExpansionOptions(); - options.setCount(200); - ValueSet expanded = myValueSetDao.expand(vs, options); - assertNotNull(expanded); - HashSet all = new HashSet<>(); - for (CodeSystem.ConceptDefinitionComponent set : cs.getConcept()) { - all.add(set.getCode()); - } - for (ValueSet.ValueSetExpansionContainsComponent v : expanded.getExpansion().getContains()) { - all.remove(v.getCode()); - } - assertTrue(all.isEmpty(), String.join(", ", all)); - assertEquals(cs.getConcept().size(), expanded.getExpansion().getTotal()); } - - @Autowired - private IJobCoordinator myJobCoordinator; - - @Autowired - private ITermDeferredStorageSvc myDeferredStorageSvc; } diff --git a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/BaseJpaR4Test.java b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/BaseJpaR4Test.java index 2fb5a9b87f6..8b98ddfe30e 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/BaseJpaR4Test.java +++ b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/BaseJpaR4Test.java @@ -19,6 +19,7 @@ */ package ca.uhn.fhir.jpa.test; +import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobMaintenanceService; import ca.uhn.fhir.batch2.jobs.export.BulkDataExportProvider; import ca.uhn.fhir.context.FhirContext; @@ -559,6 +560,8 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil @Autowired protected IJobMaintenanceService myJobMaintenanceService; + @Autowired + protected IJobCoordinator myJobCoordinator; @RegisterExtension private final PreventDanglingInterceptorsExtension myPreventDanglingInterceptorsExtension = new PreventDanglingInterceptorsExtension(()-> myInterceptorRegistry); diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/BaseReindexStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/BaseReindexStep.java new file mode 100644 index 00000000000..ec999bee47c --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/BaseReindexStep.java @@ -0,0 +1,69 @@ +package ca.uhn.fhir.batch2.jobs.reindex; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; +import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; + +public class BaseReindexStep { + + public static final int REINDEX_MAX_RETRIES = 10; + + protected final HapiTransactionService myHapiTransactionService; + + protected IFhirSystemDao mySystemDao; + + protected DaoRegistry myDaoRegistry; + + protected IIdHelperService> myIdHelperService; + + public BaseReindexStep( + HapiTransactionService theHapiTransactionService, + IFhirSystemDao theSystemDao, + DaoRegistry theRegistry, + IIdHelperService> theIdHelperService + ) { + myHapiTransactionService = theHapiTransactionService; + mySystemDao = theSystemDao; + myDaoRegistry = theRegistry; + myIdHelperService = theIdHelperService; + } + + public ReindexResults doReindex( + ResourceIdListWorkChunkJson data, + IJobDataSink theDataSink, + String theInstanceId, + String theChunkId, + ReindexJobParameters theJobParameters) { + RequestDetails requestDetails = new SystemRequestDetails(); + requestDetails.setRetry(true); + requestDetails.setMaxRetries(REINDEX_MAX_RETRIES); + + TransactionDetails transactionDetails = new TransactionDetails(); + ReindexTask.JobParameters jp = new ReindexTask.JobParameters(); + jp.setData(data) + .setRequestDetails(requestDetails) + .setTransactionDetails(transactionDetails) + .setDataSink(theDataSink) + .setInstanceId(theInstanceId) + .setChunkId(theChunkId) + .setJobParameters(theJobParameters); + + ReindexTask reindexJob = new ReindexTask( + jp, myDaoRegistry, mySystemDao, myIdHelperService + ); + + return myHapiTransactionService + .withRequest(requestDetails) + .withTransactionDetails(transactionDetails) + .withRequestPartitionId(data.getRequestPartitionId()) + .execute(reindexJob); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/CheckPendingReindexWorkStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/CheckPendingReindexWorkStep.java new file mode 100644 index 00000000000..29fddae8c7b --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/CheckPendingReindexWorkStep.java @@ -0,0 +1,42 @@ +package ca.uhn.fhir.batch2.jobs.reindex; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RetryChunkLaterException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults; +import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.api.model.ReindexJobStatus; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.annotation.Nonnull; +import org.springframework.beans.factory.annotation.Autowired; + +public class CheckPendingReindexWorkStep implements IJobStepWorker { + + @Autowired + private ReindexJobService myReindexJobService; + + @Nonnull + @Override + public RunOutcome run( + @Nonnull StepExecutionDetails theStepExecutionDetails, + @Nonnull IJobDataSink theDataSink) throws JobExecutionFailedException { + + ReindexResults results = theStepExecutionDetails.getData(); + + if (!results.getResourceToHasWorkToComplete().isEmpty()) { + if (myReindexJobService.anyResourceHasPendingReindexWork(results.getResourceToHasWorkToComplete())) { + // give time for reindex work to complete + throw new RetryChunkLaterException(ReindexUtils.getRetryLaterDelay()); + } + } + + return RunOutcome.SUCCESS; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java index 66ab72ca05c..6ea1dc50c79 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexAppCtx.java @@ -26,12 +26,20 @@ import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator; +import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults; +import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService; import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep; import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; import ca.uhn.fhir.rest.server.provider.ProviderConstants; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -40,12 +48,22 @@ public class ReindexAppCtx { public static final String JOB_REINDEX = "REINDEX"; + @Autowired + private HapiTransactionService myHapiTransactionService; + @Autowired + private IFhirSystemDao mySystemDao; + @Autowired + private DaoRegistry myRegistry; + @Autowired + private IIdHelperService> myIdHelperService; + + // Version 2 @Bean - public JobDefinition reindexJobDefinition(IBatch2DaoSvc theBatch2DaoSvc) { + public JobDefinition reindexJobDefinitionV2(IBatch2DaoSvc theBatch2DaoSvc) { return JobDefinition.newBuilder() .setJobDefinitionId(JOB_REINDEX) .setJobDescription("Reindex resources") - .setJobDefinitionVersion(1) + .setJobDefinitionVersion(2) .setParametersType(ReindexJobParameters.class) .setParametersValidator(reindexJobParametersValidator(theBatch2DaoSvc)) .gatedExecution() @@ -59,12 +77,42 @@ public class ReindexAppCtx { "Load IDs of resources to reindex", ResourceIdListWorkChunkJson.class, reindexLoadIdsStep(theBatch2DaoSvc)) - .addLastStep("reindex", + .addIntermediateStep("reindex-start", "Perform the resource reindex", - reindexStep()) + ReindexResults.class, + reindexStepV2()) + .addLastStep("reindex-pending-work", + "Waits for reindex work to complete.", + pendingWorkStep()) .build(); } + // Version 1 + @Bean + public JobDefinition reindexJobDefinitionV1(IBatch2DaoSvc theBatch2DaoSvc) { + return JobDefinition.newBuilder() + .setJobDefinitionId(JOB_REINDEX) + .setJobDescription("Reindex resources") + .setJobDefinitionVersion(1) + .setParametersType(ReindexJobParameters.class) + .setParametersValidator(reindexJobParametersValidator(theBatch2DaoSvc)) + .gatedExecution() + .addFirstStep( + "generate-ranges", + "Generate data ranges to reindex", + ChunkRangeJson.class, + reindexGenerateRangeChunksStep()) + .addIntermediateStep( + "load-ids", + "Load IDs of resources to reindex", + ResourceIdListWorkChunkJson.class, + reindexLoadIdsStep(theBatch2DaoSvc)) + .addLastStep("reindex-start", + "Start the resource reindex", + reindexStepV1()) + .build(); + } + @Bean public IJobStepWorker reindexGenerateRangeChunksStep() { return new GenerateRangeChunksStep<>(); @@ -83,15 +131,30 @@ public class ReindexAppCtx { } @Bean - public ReindexStep reindexStep() { - return new ReindexStep(); + public ReindexStepV1 reindexStepV1() { + return new ReindexStepV1(myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService); + } + + @Bean + public ReindexStepV2 reindexStepV2() { + return new ReindexStepV2(jobService(), myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService); + } + + @Bean + public CheckPendingReindexWorkStep pendingWorkStep() { + return new CheckPendingReindexWorkStep(); } @Bean public ReindexProvider reindexProvider( - FhirContext theFhirContext, - IJobCoordinator theJobCoordinator, - IJobPartitionProvider theJobPartitionHandler) { + FhirContext theFhirContext, + IJobCoordinator theJobCoordinator, + IJobPartitionProvider theJobPartitionHandler) { return new ReindexProvider(theFhirContext, theJobCoordinator, theJobPartitionHandler); } + + @Bean + public ReindexJobService jobService() { + return new ReindexJobService(myRegistry); + } } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java deleted file mode 100644 index 2644f3af89f..00000000000 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java +++ /dev/null @@ -1,203 +0,0 @@ -/*- - * #%L - * hapi-fhir-storage-batch2-jobs - * %% - * Copyright (C) 2014 - 2024 Smile CDR, Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package ca.uhn.fhir.batch2.jobs.reindex; - -import ca.uhn.fhir.batch2.api.IJobDataSink; -import ca.uhn.fhir.batch2.api.IJobStepWorker; -import ca.uhn.fhir.batch2.api.JobExecutionFailedException; -import ca.uhn.fhir.batch2.api.RunOutcome; -import ca.uhn.fhir.batch2.api.StepExecutionDetails; -import ca.uhn.fhir.batch2.api.VoidModel; -import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; -import ca.uhn.fhir.jpa.api.dao.DaoRegistry; -import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; -import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; -import ca.uhn.fhir.jpa.api.dao.ReindexOutcome; -import ca.uhn.fhir.jpa.api.dao.ReindexParameters; -import ca.uhn.fhir.jpa.api.svc.IIdHelperService; -import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; -import ca.uhn.fhir.parser.DataFormatException; -import ca.uhn.fhir.rest.api.server.RequestDetails; -import ca.uhn.fhir.rest.api.server.SystemRequestDetails; -import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; -import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; -import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; -import ca.uhn.fhir.util.StopWatch; -import jakarta.annotation.Nonnull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class ReindexStep implements IJobStepWorker { - - public static final int REINDEX_MAX_RETRIES = 10; - - private static final Logger ourLog = LoggerFactory.getLogger(ReindexStep.class); - - @Autowired - private HapiTransactionService myHapiTransactionService; - - @Autowired - private IFhirSystemDao mySystemDao; - - @Autowired - private DaoRegistry myDaoRegistry; - - @Autowired - private IIdHelperService myIdHelperService; - - @Nonnull - @Override - public RunOutcome run( - @Nonnull StepExecutionDetails theStepExecutionDetails, - @Nonnull IJobDataSink theDataSink) - throws JobExecutionFailedException { - - ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData(); - ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters(); - - return doReindex( - data, - theDataSink, - theStepExecutionDetails.getInstance().getInstanceId(), - theStepExecutionDetails.getChunkId(), - jobParameters); - } - - @Nonnull - public RunOutcome doReindex( - ResourceIdListWorkChunkJson data, - IJobDataSink theDataSink, - String theInstanceId, - String theChunkId, - ReindexJobParameters theJobParameters) { - RequestDetails requestDetails = new SystemRequestDetails(); - requestDetails.setRetry(true); - requestDetails.setMaxRetries(REINDEX_MAX_RETRIES); - TransactionDetails transactionDetails = new TransactionDetails(); - ReindexJob reindexJob = new ReindexJob( - data, requestDetails, transactionDetails, theDataSink, theInstanceId, theChunkId, theJobParameters); - - myHapiTransactionService - .withRequest(requestDetails) - .withTransactionDetails(transactionDetails) - .withRequestPartitionId(data.getRequestPartitionId()) - .execute(reindexJob); - - return new RunOutcome(data.size()); - } - - private class ReindexJob implements TransactionCallback { - private final ResourceIdListWorkChunkJson myData; - private final RequestDetails myRequestDetails; - private final TransactionDetails myTransactionDetails; - private final IJobDataSink myDataSink; - private final String myChunkId; - private final String myInstanceId; - private final ReindexJobParameters myJobParameters; - - public ReindexJob( - ResourceIdListWorkChunkJson theData, - RequestDetails theRequestDetails, - TransactionDetails theTransactionDetails, - IJobDataSink theDataSink, - String theInstanceId, - String theChunkId, - ReindexJobParameters theJobParameters) { - myData = theData; - myRequestDetails = theRequestDetails; - myTransactionDetails = theTransactionDetails; - myDataSink = theDataSink; - myInstanceId = theInstanceId; - myChunkId = theChunkId; - myJobParameters = theJobParameters; - myDataSink.setWarningProcessor(new ReindexWarningProcessor()); - } - - @Override - public Void doInTransaction(@Nonnull TransactionStatus theStatus) { - - List persistentIds = myData.getResourcePersistentIds(myIdHelperService); - - ourLog.info( - "Starting reindex work chunk with {} resources - Instance[{}] Chunk[{}]", - persistentIds.size(), - myInstanceId, - myChunkId); - StopWatch sw = new StopWatch(); - - // Prefetch Resources from DB - - boolean reindexSearchParameters = - myJobParameters.getReindexSearchParameters() != ReindexParameters.ReindexSearchParametersEnum.NONE; - mySystemDao.preFetchResources(persistentIds, reindexSearchParameters); - ourLog.info( - "Prefetched {} resources in {} - Instance[{}] Chunk[{}]", - persistentIds.size(), - sw, - myInstanceId, - myChunkId); - - ReindexParameters parameters = new ReindexParameters() - .setReindexSearchParameters(myJobParameters.getReindexSearchParameters()) - .setOptimizeStorage(myJobParameters.getOptimizeStorage()) - .setOptimisticLock(myJobParameters.getOptimisticLock()); - - // Reindex - - sw.restart(); - for (int i = 0; i < myData.size(); i++) { - - String nextResourceType = myData.getResourceType(i); - IFhirResourceDao dao = myDaoRegistry.getResourceDao(nextResourceType); - IResourcePersistentId resourcePersistentId = persistentIds.get(i); - try { - - ReindexOutcome outcome = - dao.reindex(resourcePersistentId, parameters, myRequestDetails, myTransactionDetails); - outcome.getWarnings().forEach(myDataSink::recoveredError); - - } catch (BaseServerResponseException | DataFormatException e) { - String resourceForcedId = myIdHelperService - .translatePidIdToForcedIdWithCache(resourcePersistentId) - .orElse(resourcePersistentId.toString()); - String resourceId = nextResourceType + "/" + resourceForcedId; - ourLog.debug("Failure during reindexing {}", resourceId, e); - myDataSink.recoveredError("Failure reindexing " + resourceId + ": " + e.getMessage()); - } - } - - ourLog.info( - "Finished reindexing {} resources in {} - {}/sec - Instance[{}] Chunk[{}]", - persistentIds.size(), - sw, - sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS), - myInstanceId, - myChunkId); - - return null; - } - } -} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStepV1.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStepV1.java new file mode 100644 index 00000000000..131445ee04b --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStepV1.java @@ -0,0 +1,66 @@ +/*- + * #%L + * hapi-fhir-storage-batch2-jobs + * %% + * Copyright (C) 2014 - 2024 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package ca.uhn.fhir.batch2.jobs.reindex; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import jakarta.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReindexStepV1 extends BaseReindexStep implements IJobStepWorker { + + private static final Logger ourLog = LoggerFactory.getLogger(ReindexStepV1.class); + + public ReindexStepV1(HapiTransactionService theHapiTransactionService, IFhirSystemDao theSystemDao, DaoRegistry theRegistry, IIdHelperService> theIdHelperService) { + super(theHapiTransactionService, theSystemDao, theRegistry, theIdHelperService); + } + + @Nonnull + @Override + public RunOutcome run( + @Nonnull StepExecutionDetails theStepExecutionDetails, + @Nonnull IJobDataSink theDataSink) + throws JobExecutionFailedException { + + ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData(); + ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters(); + + doReindex( + data, + theDataSink, + theStepExecutionDetails.getInstance().getInstanceId(), + theStepExecutionDetails.getChunkId(), + jobParameters); + + + return new RunOutcome(data.size()); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStepV2.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStepV2.java new file mode 100644 index 00000000000..f5453347f2a --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStepV2.java @@ -0,0 +1,65 @@ +package ca.uhn.fhir.batch2.jobs.reindex; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RetryChunkLaterException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; +import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson; +import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults; +import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import jakarta.annotation.Nonnull; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public class ReindexStepV2 extends BaseReindexStep implements IJobStepWorker { + + + private ReindexJobService myReindexJobService; + + public ReindexStepV2(ReindexJobService theJobService, + HapiTransactionService theHapiTransactionService, IFhirSystemDao theSystemDao, DaoRegistry theRegistry, IIdHelperService> theIdHelperService) { + super(theHapiTransactionService, theSystemDao, theRegistry, theIdHelperService); + myReindexJobService = theJobService; + } + + @Nonnull + @Override + public RunOutcome run(@Nonnull StepExecutionDetails theStepExecutionDetails, @Nonnull IJobDataSink theDataSink) throws JobExecutionFailedException { + ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData(); + ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters(); + + // This is not strictly necessary; + // but we'll ensure that no outstanding "reindex work" + // is waiting to be completed, so that when we do + // our reindex, it won't skip over data + Map resourceTypesToCheckFlag = new HashMap<>(); + data.getTypedPids().forEach(id -> { + // we don't really care about duplicates + resourceTypesToCheckFlag.put(id.getResourceType(), true); + }); + if (myReindexJobService.anyResourceHasPendingReindexWork(resourceTypesToCheckFlag)) { + throw new RetryChunkLaterException(ReindexUtils.getRetryLaterDelay()); + } + + ReindexResults results = doReindex( + data, + theDataSink, + theStepExecutionDetails.getInstance().getInstanceId(), + theStepExecutionDetails.getChunkId(), + jobParameters); + + theDataSink.accept(results); + + return new RunOutcome(data.size()); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexTask.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexTask.java new file mode 100644 index 00000000000..9deb3419099 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexTask.java @@ -0,0 +1,201 @@ +package ca.uhn.fhir.batch2.jobs.reindex; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; +import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.api.dao.ReindexOutcome; +import ca.uhn.fhir.jpa.api.dao.ReindexParameters; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.parser.DataFormatException; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; +import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; +import ca.uhn.fhir.util.StopWatch; +import jakarta.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class ReindexTask implements TransactionCallback { + private static final Logger ourLog = LoggerFactory.getLogger(ReindexTask.class); + + public static class JobParameters { + private ResourceIdListWorkChunkJson myData; + private RequestDetails myRequestDetails; + private TransactionDetails myTransactionDetails; + private IJobDataSink myDataSink; + private String myChunkId; + private String myInstanceId; + private ReindexJobParameters myJobParameters; + + public ResourceIdListWorkChunkJson getData() { + return myData; + } + + public JobParameters setData(ResourceIdListWorkChunkJson theData) { + myData = theData; + return this; + } + + public RequestDetails getRequestDetails() { + return myRequestDetails; + } + + public JobParameters setRequestDetails(RequestDetails theRequestDetails) { + myRequestDetails = theRequestDetails; + return this; + } + + public TransactionDetails getTransactionDetails() { + return myTransactionDetails; + } + + public JobParameters setTransactionDetails(TransactionDetails theTransactionDetails) { + myTransactionDetails = theTransactionDetails; + return this; + } + + public IJobDataSink getDataSink() { + return myDataSink; + } + + public JobParameters setDataSink(IJobDataSink theDataSink) { + myDataSink = theDataSink; + return this; + } + + public String getChunkId() { + return myChunkId; + } + + public JobParameters setChunkId(String theChunkId) { + myChunkId = theChunkId; + return this; + } + + public String getInstanceId() { + return myInstanceId; + } + + public JobParameters setInstanceId(String theInstanceId) { + myInstanceId = theInstanceId; + return this; + } + + public ReindexJobParameters getJobParameters() { + return myJobParameters; + } + + public JobParameters setJobParameters(ReindexJobParameters theJobParameters) { + myJobParameters = theJobParameters; + return this; + } + } + + private final DaoRegistry myDaoRegistry; + private final IFhirSystemDao mySystemDao; + + private final IIdHelperService> myIdHelperService; + + private final ResourceIdListWorkChunkJson myData; + private final RequestDetails myRequestDetails; + private final TransactionDetails myTransactionDetails; + private final IJobDataSink myDataSink; + private final String myChunkId; + private final String myInstanceId; + private final ReindexJobParameters myJobParameters; + + public ReindexTask( + JobParameters theJobParameters, + DaoRegistry theRegistry, + IFhirSystemDao theSystemDao, + IIdHelperService> theIdHelperService + ) { + myDaoRegistry = theRegistry; + mySystemDao = theSystemDao; + myIdHelperService = theIdHelperService; + + myData = theJobParameters.getData(); + myRequestDetails = theJobParameters.getRequestDetails(); + myTransactionDetails = theJobParameters.getTransactionDetails(); + myDataSink = theJobParameters.getDataSink(); + myInstanceId = theJobParameters.getInstanceId(); + myChunkId = theJobParameters.getChunkId(); + myJobParameters = theJobParameters.getJobParameters(); + myDataSink.setWarningProcessor(new ReindexWarningProcessor()); + } + + @Override + public ReindexResults doInTransaction(@Nonnull TransactionStatus theStatus) { + List> persistentIds = myData.getResourcePersistentIds(myIdHelperService); + + ourLog.info( + "Starting reindex work chunk with {} resources - Instance[{}] Chunk[{}]", + persistentIds.size(), + myInstanceId, + myChunkId); + StopWatch sw = new StopWatch(); + ReindexResults reindexResults = new ReindexResults(); + + // Prefetch Resources from DB + boolean reindexSearchParameters = + myJobParameters.getReindexSearchParameters() != ReindexParameters.ReindexSearchParametersEnum.NONE; + mySystemDao.preFetchResources(persistentIds, reindexSearchParameters); + ourLog.info( + "Prefetched {} resources in {} - Instance[{}] Chunk[{}]", + persistentIds.size(), + sw, + myInstanceId, + myChunkId); + + ReindexParameters parameters = new ReindexParameters() + .setReindexSearchParameters(myJobParameters.getReindexSearchParameters()) + .setOptimizeStorage(myJobParameters.getOptimizeStorage()) + .setOptimisticLock(myJobParameters.getOptimisticLock()); + + // Reindex + + sw.restart(); + for (int i = 0; i < myData.size(); i++) { + + String nextResourceType = myData.getResourceType(i); + IFhirResourceDao dao = myDaoRegistry.getResourceDao(nextResourceType); + IResourcePersistentId resourcePersistentId = persistentIds.get(i); + try { + + ReindexOutcome outcome = + dao.reindex(resourcePersistentId, parameters, myRequestDetails, myTransactionDetails); + + outcome.getWarnings().forEach(myDataSink::recoveredError); + reindexResults.addResourceTypeToCompletionStatus(nextResourceType, + outcome.isHasPendingWork()); + + } catch (BaseServerResponseException | DataFormatException e) { + String resourceForcedId = myIdHelperService + .translatePidIdToForcedIdWithCache(resourcePersistentId) + .orElse(resourcePersistentId.toString()); + String resourceId = nextResourceType + "/" + resourceForcedId; + ourLog.error("Failure during reindexing {}", resourceId, e); + myDataSink.recoveredError("Failure reindexing " + resourceId + ": " + e.getMessage()); + } + } + + ourLog.info( + "Finished reindexing {} resources in {} - {}/sec - Instance[{}] Chunk[{}]", + persistentIds.size(), + sw, + sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS), + myInstanceId, + myChunkId); + + return reindexResults; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexUtils.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexUtils.java new file mode 100644 index 00000000000..59c13b9d37a --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexUtils.java @@ -0,0 +1,28 @@ +package ca.uhn.fhir.batch2.jobs.reindex; + +import com.google.common.annotations.VisibleForTesting; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +public class ReindexUtils { + + private static final Duration RETRY_DELAY = Duration.of(30, ChronoUnit.SECONDS); + + private static Duration myDelay; + + /** + * Returns the retry delay for reindex jobs that require polling. + */ + public static Duration getRetryLaterDelay() { + if (myDelay != null) { + return myDelay; + } + return RETRY_DELAY; + } + + @VisibleForTesting + public static void setRetryDelay(Duration theDuration) { + myDelay = theDuration; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/models/ReindexResults.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/models/ReindexResults.java new file mode 100644 index 00000000000..f20f952b87f --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/models/ReindexResults.java @@ -0,0 +1,30 @@ +package ca.uhn.fhir.batch2.jobs.reindex.models; + +import ca.uhn.fhir.model.api.IModelJson; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.HashMap; + +public class ReindexResults implements IModelJson { + + /** + * A map of resource type : whether or not the reindex is completed; + * true = more work needed. false (or omitted) = reindex is done + */ + @JsonProperty("resource2NeedsWork") + private HashMap myResourceToHasWorkToComplete; + + public ReindexResults() {} + + public HashMap getResourceToHasWorkToComplete() { + if (myResourceToHasWorkToComplete == null) { + myResourceToHasWorkToComplete = new HashMap<>(); + } + return myResourceToHasWorkToComplete; + } + + public void addResourceTypeToCompletionStatus(String theResourceType, boolean theRequiresMoreWork) { + getResourceToHasWorkToComplete() + .put(theResourceType, theRequiresMoreWork); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/svcs/ReindexJobService.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/svcs/ReindexJobService.java new file mode 100644 index 00000000000..39a4273a8b7 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/svcs/ReindexJobService.java @@ -0,0 +1,38 @@ +package ca.uhn.fhir.batch2.jobs.reindex.svcs; + +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.api.model.ReindexJobStatus; + +import java.util.Map; + +public class ReindexJobService { + + private final DaoRegistry myDaoRegistry; + + public ReindexJobService(DaoRegistry theRegistry) { + myDaoRegistry = theRegistry; + } + + /** + * Checks if any of the resource types in the map have any pending reindex work waiting. + * This will return true after the first such encounter, and only return false if no + * reindex work is required for any resource. + * @param theResourceTypesToCheckFlag map of resourceType:whether or not to check + * @return true if there's reindex work pending, false otherwise + */ + public boolean anyResourceHasPendingReindexWork(Map theResourceTypesToCheckFlag) { + for (String resourceType : theResourceTypesToCheckFlag.keySet()) { + boolean toCheck = theResourceTypesToCheckFlag.get(resourceType); + if (toCheck) { + IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType); + + ReindexJobStatus status = dao.getReindexJobStatus(); + if (status.isHasReindexWorkPending()) { + return true; + } + } + } + return false; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexJobParametersValidatorTest.java b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexTaskParametersValidatorTest.java similarity index 95% rename from hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexJobParametersValidatorTest.java rename to hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexTaskParametersValidatorTest.java index 0fe1c00e461..5f3b62f91c5 100644 --- a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexJobParametersValidatorTest.java +++ b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexTaskParametersValidatorTest.java @@ -13,7 +13,7 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @ExtendWith(MockitoExtension.class) -public class ReindexJobParametersValidatorTest { +public class ReindexTaskParametersValidatorTest { @Mock private UrlListValidator myListValidator; diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/ResourceIdListWorkChunkJson.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/ResourceIdListWorkChunkJson.java index b7969f1ae1c..0ecacb9bdf6 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/ResourceIdListWorkChunkJson.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/chunk/ResourceIdListWorkChunkJson.java @@ -62,7 +62,7 @@ public class ResourceIdListWorkChunkJson implements IModelJson { return myRequestPartitionId; } - private List getTypedPids() { + public List getTypedPids() { if (myTypedPids == null) { myTypedPids = new ArrayList<>(); } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirResourceDao.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirResourceDao.java index 7082fed8240..d918f5572ce 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirResourceDao.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirResourceDao.java @@ -26,6 +26,7 @@ import ca.uhn.fhir.jpa.api.model.DeleteConflictList; import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome; import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.api.model.ExpungeOutcome; +import ca.uhn.fhir.jpa.api.model.ReindexJobStatus; import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource; import ca.uhn.fhir.jpa.model.entity.TagTypeEnum; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; @@ -315,12 +316,24 @@ public interface IFhirResourceDao extends IDao { * @param theResourcePersistentId The ID * @return */ + @SuppressWarnings("rawtypes") ReindexOutcome reindex( IResourcePersistentId theResourcePersistentId, ReindexParameters theReindexParameters, RequestDetails theRequest, TransactionDetails theTransactionDetails); + /** + * Returns ReindexJobStatus information object that tells the caller + * if a reindex job is still in progress or not. + * + * If the implementing DAO requires additional work during reindexing, + * this is the method to override. + */ + default ReindexJobStatus getReindexJobStatus() { + return ReindexJobStatus.NO_WORK_NEEDED; + } + void removeTag( IIdType theId, TagTypeEnum theTagType, String theSystem, String theCode, RequestDetails theRequestDetails); diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/ReindexOutcome.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/ReindexOutcome.java index 8c7eb7fa5b9..744e1def01f 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/ReindexOutcome.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/ReindexOutcome.java @@ -29,6 +29,11 @@ public class ReindexOutcome { private List myWarnings; + /** + * True if there is additional (async) work to wait on. + */ + private boolean myHasPendingWork; + public List getWarnings() { return defaultIfNull(myWarnings, Collections.emptyList()); } @@ -39,4 +44,12 @@ public class ReindexOutcome { } myWarnings.add(theWarning); } + + public boolean isHasPendingWork() { + return myHasPendingWork; + } + + public void setHasPendingWork(boolean theHasPendingWork) { + myHasPendingWork = theHasPendingWork; + } } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/ReindexJobStatus.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/ReindexJobStatus.java new file mode 100644 index 00000000000..227756c5e44 --- /dev/null +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/ReindexJobStatus.java @@ -0,0 +1,16 @@ +package ca.uhn.fhir.jpa.api.model; + +public class ReindexJobStatus { + + public static ReindexJobStatus NO_WORK_NEEDED = new ReindexJobStatus(); + + private boolean myHasReindexWorkPending; + + public boolean isHasReindexWorkPending() { + return myHasReindexWorkPending; + } + + public void setHasReindexWorkPending(boolean theHasReindexWorkPending) { + myHasReindexWorkPending = theHasReindexWorkPending; + } +}