value set expansion (#6286)

This commit is contained in:
TipzCM 2024-10-01 13:52:39 -04:00 committed by GitHub
parent 6bb72e445d
commit 62fa7ec787
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 1550 additions and 359 deletions

View File

@ -64,4 +64,12 @@
<property name="format" value="^(Base|Abstract).+$"/> <property name="format" value="^(Base|Abstract).+$"/>
</module> </module>
</module> </module>
<!-- for suppression of rules; to use, surround code to exclude with comments: -->
<!-- CHECKSTYLE.OFF RuleToDisable AND CHECKSTYLE.ON RuleToDisable -->
<module name="SuppressWithPlainTextCommentFilter">
<property name="offCommentFormat" value="CHECKSTYLE.OFF\: ([\w\|]+)" />
<property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)" />
</module>
</module> </module>

View File

@ -0,0 +1,17 @@
---
type: fix
issue: 6285
title: "Updated the Reindex Batch2 job to allow
for an additional step that will check to ensure
that no pending 'reindex' work is needed.
This was done to prevent a bug in which
value set expansion would not return all
the existing CodeSystem Concepts after
a reindex call, due to some of the concepts
being deferred to future job runs.
As such, `$reindex` operations on CodeSystems
will no longer result in incorrect value set
expansion when such an expansion is called
'too soon' after a $reindex operation.
"

View File

@ -21,7 +21,6 @@ package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider; import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.FhirVersionEnum;
@ -158,6 +157,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static java.util.Objects.isNull; import static java.util.Objects.isNull;
import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -1315,7 +1315,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
myJobPartitionProvider.getPartitionedUrls(theRequestDetails, urls).forEach(params::addPartitionedUrl); myJobPartitionProvider.getPartitionedUrls(theRequestDetails, urls).forEach(params::addPartitionedUrl);
JobInstanceStartRequest request = new JobInstanceStartRequest(); JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); request.setJobDefinitionId(JOB_REINDEX);
request.setParameters(params); request.setParameters(params);
myJobCoordinator.startInstance(theRequestDetails, request); myJobCoordinator.startInstance(theRequestDetails, request);

View File

@ -27,7 +27,11 @@ import ca.uhn.fhir.context.support.IValidationSupport.CodeValidationResult;
import ca.uhn.fhir.context.support.LookupCodeRequest; import ca.uhn.fhir.context.support.LookupCodeRequest;
import ca.uhn.fhir.context.support.ValidationSupportContext; import ca.uhn.fhir.context.support.ValidationSupportContext;
import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem;
import ca.uhn.fhir.jpa.api.dao.ReindexOutcome;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.api.model.ReindexJobStatus;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService; import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource; import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.entity.ResourceTable;
@ -176,6 +180,47 @@ public class JpaResourceDaoCodeSystem<T extends IBaseResource> extends BaseHapiF
myTermDeferredStorageSvc.deleteCodeSystemForResource(theEntityToDelete); myTermDeferredStorageSvc.deleteCodeSystemForResource(theEntityToDelete);
} }
/**
* If there are more code systems to process
* than {@link JpaStorageSettings#getDeferIndexingForCodesystemsOfSize()},
* then these codes will have their processing deferred (for a later time).
*
* This can result in future reindex steps *skipping* these code systems (if
* they're still deferred) and thus incorrect expansions resulting.
*
* So we override the reindex method for CodeSystems specifically to
* force reindex batch jobs to wait until all code systems are processed before
* moving on.
*/
@SuppressWarnings("rawtypes")
@Override
public ReindexOutcome reindex(
IResourcePersistentId thePid,
ReindexParameters theReindexParameters,
RequestDetails theRequest,
TransactionDetails theTransactionDetails) {
ReindexOutcome outcome = super.reindex(thePid, theReindexParameters, theRequest, theTransactionDetails);
if (outcome.getWarnings().isEmpty()) {
outcome.setHasPendingWork(true);
}
return outcome;
}
@Override
public ReindexJobStatus getReindexJobStatus() {
boolean isQueueEmpty = myTermDeferredStorageSvc.isStorageQueueEmpty(true);
ReindexJobStatus status = new ReindexJobStatus();
status.setHasReindexWorkPending(!isQueueEmpty);
if (status.isHasReindexWorkPending()) {
// force a run
myTermDeferredStorageSvc.saveDeferred();
}
return status;
}
@Override @Override
public ResourceTable updateEntity( public ResourceTable updateEntity(
RequestDetails theRequest, RequestDetails theRequest,

View File

@ -593,7 +593,7 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
if (theStatisticsTracker.getUpdatedConceptCount() <= myStorageSettings.getDeferIndexingForCodesystemsOfSize()) { if (theStatisticsTracker.getUpdatedConceptCount() <= myStorageSettings.getDeferIndexingForCodesystemsOfSize()) {
saveConcept(conceptToAdd); saveConcept(conceptToAdd);
Long nextConceptPid = conceptToAdd.getId(); Long nextConceptPid = conceptToAdd.getId();
Validate.notNull(nextConceptPid); Objects.requireNonNull(nextConceptPid);
} else { } else {
myDeferredStorageSvc.addConceptToStorageQueue(conceptToAdd); myDeferredStorageSvc.addConceptToStorageQueue(conceptToAdd);
} }

View File

@ -22,7 +22,6 @@ package ca.uhn.fhir.jpa.term;
import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao; import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao; import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao;
@ -79,6 +78,8 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHas
private static final long SAVE_ALL_DEFERRED_WARN_MINUTES = 1; private static final long SAVE_ALL_DEFERRED_WARN_MINUTES = 1;
private static final long SAVE_ALL_DEFERRED_ERROR_MINUTES = 5; private static final long SAVE_ALL_DEFERRED_ERROR_MINUTES = 5;
private boolean myAllowDeferredTasksTimeout = true; private boolean myAllowDeferredTasksTimeout = true;
private static final List<String> BATCH_JOBS_TO_CARE_ABOUT =
List.of(TERM_CODE_SYSTEM_DELETE_JOB_NAME, TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME);
private final List<TermCodeSystem> myDeferredCodeSystemsDeletions = Collections.synchronizedList(new ArrayList<>()); private final List<TermCodeSystem> myDeferredCodeSystemsDeletions = Collections.synchronizedList(new ArrayList<>());
private final Queue<TermCodeSystemVersion> myDeferredCodeSystemVersionsDeletions = new ConcurrentLinkedQueue<>(); private final Queue<TermCodeSystemVersion> myDeferredCodeSystemVersionsDeletions = new ConcurrentLinkedQueue<>();
private final List<TermConcept> myDeferredConcepts = Collections.synchronizedList(new ArrayList<>()); private final List<TermConcept> myDeferredConcepts = Collections.synchronizedList(new ArrayList<>());
@ -436,7 +437,7 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHas
return retVal; return retVal;
} }
private boolean isJobsExecuting() { public boolean isJobsExecuting() {
cleanseEndedJobs(); cleanseEndedJobs();
return !myJobExecutions.isEmpty(); return !myJobExecutions.isEmpty();
@ -448,15 +449,18 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHas
* This is mostly a fail-safe * This is mostly a fail-safe
* because "cancelled" jobs are never removed. * because "cancelled" jobs are never removed.
*/ */
List<String> executions = new ArrayList<>(myJobExecutions);
List<String> idsToDelete = new ArrayList<>(); List<String> idsToDelete = new ArrayList<>();
for (String id : executions) { for (String jobId : BATCH_JOBS_TO_CARE_ABOUT) {
// TODO - might want to consider a "fetch all instances" List<JobInstance> jobInstanceInEndedState = myJobCoordinator.getInstancesbyJobDefinitionIdAndEndedStatus(
JobInstance instance = myJobCoordinator.getInstance(id); jobId,
if (StatusEnum.getEndedStatuses().contains(instance.getStatus())) { true, // ended = true (COMPLETED, FAILED, CANCELLED jobs only)
Math.max(myJobExecutions.size(), 1), // at most this many
0);
for (JobInstance instance : jobInstanceInEndedState) {
idsToDelete.add(instance.getInstanceId()); idsToDelete.add(instance.getInstanceId());
} }
} }
for (String id : idsToDelete) { for (String id : idsToDelete) {
myJobExecutions.remove(id); myJobExecutions.remove(id);
} }

View File

@ -78,6 +78,8 @@ public interface ITermDeferredStorageSvc {
void logQueueForUnitTest(); void logQueueForUnitTest();
boolean isJobsExecuting();
/** /**
* Only to be used from tests - Disallow test timeouts on deferred tasks * Only to be used from tests - Disallow test timeouts on deferred tasks
*/ */

View File

@ -1,14 +1,11 @@
package ca.uhn.fhir.jpa.provider.dstu3; package ca.uhn.fhir.jpa.provider.dstu3;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
@ -46,9 +43,11 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@ -202,7 +201,7 @@ public class ResourceProviderCustomSearchParamDstu3Test extends BaseResourceProv
mySearchParameterDao.create(fooSp, mySrd); mySearchParameterDao.create(fooSp, mySrd);
runInTransaction(()->{ runInTransaction(()->{
List<JobInstance> allJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX); List<JobInstance> allJobs = myBatch2JobHelper.findJobsByDefinition(JOB_REINDEX);
assertEquals(1, allJobs.size()); assertEquals(1, allJobs.size());
assertEquals(1, allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().size()); assertEquals(1, allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().size());
assertEquals("Patient?", allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().get(0).getUrl()); assertEquals("Patient?", allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().get(0).getUrl());

View File

@ -55,6 +55,7 @@ import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXED; import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXED;
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED; import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -1075,7 +1076,7 @@ public class FhirResourceDaoR4ComboUniqueParamTest extends BaseComboParamsR4Test
parameters.addUrl(url); parameters.addUrl(url);
} }
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(parameters); startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest); Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
ourLog.info("Started reindex job with id {}", res.getInstanceId()); ourLog.info("Started reindex job with id {}", res.getInstanceId());

View File

@ -23,7 +23,7 @@ import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamUri;
import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.model.util.SearchParamHash; import ca.uhn.fhir.jpa.model.util.SearchParamHash;
import ca.uhn.fhir.jpa.model.util.UcumServiceUtil; import ca.uhn.fhir.jpa.model.util.UcumServiceUtil;
import ca.uhn.fhir.jpa.reindex.ReindexStepTest; import ca.uhn.fhir.jpa.reindex.ReindexStepV1Test;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test; import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.rest.param.BaseParam; import ca.uhn.fhir.rest.param.BaseParam;
@ -57,6 +57,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List; import java.util.List;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
@ -312,7 +313,7 @@ public class FhirResourceDaoR4IndexStorageOptimizedTest extends BaseJpaR4Test {
parameters.addUrl(url); parameters.addUrl(url);
} }
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(parameters); startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest); Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
ourLog.info("Started reindex job with id {}", res.getInstanceId()); ourLog.info("Started reindex job with id {}", res.getInstanceId());
@ -321,7 +322,7 @@ public class FhirResourceDaoR4IndexStorageOptimizedTest extends BaseJpaR4Test {
// Additional existing tests with enabled IndexStorageOptimized // Additional existing tests with enabled IndexStorageOptimized
@Nested @Nested
public class IndexStorageOptimizedReindexStepTest extends ReindexStepTest { public class IndexStorageOptimizedReindexStepTestV1 extends ReindexStepV1Test {
@BeforeEach @BeforeEach
void setUp() { void setUp() {
myStorageSettings.setIndexStorageOptimized(true); myStorageSettings.setIndexStorageOptimized(true);

View File

@ -2,12 +2,15 @@ package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson; import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson;
import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeStep; import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeStep;
import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexStepV1;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.support.ValidationSupportContext; import ca.uhn.fhir.context.support.ValidationSupportContext;
import ca.uhn.fhir.context.support.ValueSetExpansionOptions; import ca.uhn.fhir.context.support.ValueSetExpansionOptions;
@ -18,13 +21,13 @@ import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum; import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum;
import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao; import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao;
import ca.uhn.fhir.jpa.reindex.ReindexTestHelper;
import ca.uhn.fhir.jpa.entity.TermValueSet; import ca.uhn.fhir.jpa.entity.TermValueSet;
import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum; import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum;
import ca.uhn.fhir.jpa.interceptor.ForceOffsetSearchModeInterceptor; import ca.uhn.fhir.jpa.interceptor.ForceOffsetSearchModeInterceptor;
import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test; import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.reindex.ReindexTestHelper;
import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider; import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc; import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
@ -146,7 +149,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
@Autowired @Autowired
private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc; private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc;
@Autowired @Autowired
private ReindexStep myReindexStep; private ReindexStepV1 myReindexStepV1;
@Autowired @Autowired
private DeleteExpungeStep myDeleteExpungeStep; private DeleteExpungeStep myDeleteExpungeStep;
@Autowired @Autowired
@ -1018,7 +1021,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
// insert to: HFJ_RESOURCE, HFJ_RES_VER, HFJ_RES_LINK // insert to: HFJ_RESOURCE, HFJ_RES_VER, HFJ_RES_LINK
assertEquals(4, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); assertEquals(4, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
} }
@ParameterizedTest @ParameterizedTest
@ -1031,7 +1033,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
}) })
public void testReindexJob_OptimizeStorage(boolean theOptimisticLock, ReindexParameters.OptimizeStorageModeEnum theOptimizeStorageModeEnum, int theExpectedSelectCount, int theExpectedUpdateCount) { public void testReindexJob_OptimizeStorage(boolean theOptimisticLock, ReindexParameters.OptimizeStorageModeEnum theOptimizeStorageModeEnum, int theExpectedSelectCount, int theExpectedUpdateCount) {
// Setup // Setup
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(); ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson();
IIdType patientId = createPatient(withActiveTrue()); IIdType patientId = createPatient(withActiveTrue());
IIdType orgId = createOrganization(withName("MY ORG")); IIdType orgId = createOrganization(withName("MY ORG"));
@ -1056,7 +1057,14 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
// execute // execute
myCaptureQueriesListener.clear(); myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, mock(IJobDataSink.class), "123", "456", params); JobInstance instance = new JobInstance();
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
params,
data,
instance,
mock(WorkChunk.class)
);
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, mock(IJobDataSink.class));
// validate // validate
assertThat(myCaptureQueriesListener.getSelectQueriesForCurrentThread()).hasSize(theExpectedSelectCount); assertThat(myCaptureQueriesListener.getSelectQueriesForCurrentThread()).hasSize(theExpectedSelectCount);
@ -1064,7 +1072,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
assertThat(myCaptureQueriesListener.getInsertQueriesForCurrentThread()).isEmpty(); assertThat(myCaptureQueriesListener.getInsertQueriesForCurrentThread()).isEmpty();
assertThat(myCaptureQueriesListener.getDeleteQueriesForCurrentThread()).isEmpty(); assertThat(myCaptureQueriesListener.getDeleteQueriesForCurrentThread()).isEmpty();
assertEquals(10, outcome.getRecordsProcessed()); assertEquals(10, outcome.getRecordsProcessed());
} }
@Test @Test
@ -1095,7 +1102,14 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
// execute // execute
myCaptureQueriesListener.clear(); myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, mock(IJobDataSink.class), "123", "456", params); JobInstance instance = new JobInstance();
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
params,
data,
instance,
mock(WorkChunk.class)
);
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, mock(IJobDataSink.class));
assertEquals(20, outcome.getRecordsProcessed()); assertEquals(20, outcome.getRecordsProcessed());
// validate // validate
@ -1103,10 +1117,8 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
assertEquals(0, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size()); assertEquals(0, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size());
assertEquals(0, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size()); assertEquals(0, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size());
assertEquals(0, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size()); assertEquals(0, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size());
} }
public void assertNoPartitionSelectors() { public void assertNoPartitionSelectors() {
List<SqlQuery> selectQueries = myCaptureQueriesListener.getSelectQueriesForCurrentThread(); List<SqlQuery> selectQueries = myCaptureQueriesListener.getSelectQueriesForCurrentThread();
for (SqlQuery next : selectQueries) { for (SqlQuery next : selectQueries) {

View File

@ -1,6 +1,5 @@
package ca.uhn.fhir.jpa.dao.r4; package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.context.phonetic.PhoneticEncoderEnum; import ca.uhn.fhir.context.phonetic.PhoneticEncoderEnum;
@ -22,7 +21,6 @@ import ca.uhn.fhir.rest.param.NumberParam;
import ca.uhn.fhir.rest.param.ReferenceOrListParam; import ca.uhn.fhir.rest.param.ReferenceOrListParam;
import ca.uhn.fhir.rest.param.ReferenceParam; import ca.uhn.fhir.rest.param.ReferenceParam;
import ca.uhn.fhir.rest.param.StringParam; import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam; import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
@ -70,6 +68,7 @@ import org.springframework.transaction.support.TransactionTemplate;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static org.apache.commons.lang3.StringUtils.countMatches; import static org.apache.commons.lang3.StringUtils.countMatches;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -441,11 +440,11 @@ public class FhirResourceDaoR4SearchCustomSearchParamTest extends BaseJpaR4Test
fooSp.setXpathUsage(org.hl7.fhir.r4.model.SearchParameter.XPathUsageType.NORMAL); fooSp.setXpathUsage(org.hl7.fhir.r4.model.SearchParameter.XPathUsageType.NORMAL);
fooSp.setStatus(org.hl7.fhir.r4.model.Enumerations.PublicationStatus.ACTIVE); fooSp.setStatus(org.hl7.fhir.r4.model.Enumerations.PublicationStatus.ACTIVE);
List<JobInstance> initialJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX); List<JobInstance> initialJobs = myBatch2JobHelper.findJobsByDefinition(JOB_REINDEX);
mySearchParameterDao.create(fooSp, mySrd); mySearchParameterDao.create(fooSp, mySrd);
List<JobInstance> finalJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX); List<JobInstance> finalJobs = myBatch2JobHelper.findJobsByDefinition(JOB_REINDEX);
List<JobInstance> newJobs = finalJobs.stream().filter(t -> !initialJobs.contains(t)).collect(Collectors.toList()); List<JobInstance> newJobs = finalJobs.stream().filter(t -> !initialJobs.contains(t)).collect(Collectors.toList());
assertThat(newJobs.size()).as("number of jobs created").isEqualTo(1); assertThat(newJobs.size()).as("number of jobs created").isEqualTo(1);
} }

View File

@ -6,7 +6,7 @@ import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep; import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexStepV1;
import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -26,7 +26,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class ReindexStepTest { public class ReindexStepV1Test {
@Mock @Mock
private HapiTransactionService myHapiTransactionService; private HapiTransactionService myHapiTransactionService;
@ -34,7 +34,7 @@ public class ReindexStepTest {
private IJobDataSink<VoidModel> myDataSink; private IJobDataSink<VoidModel> myDataSink;
@InjectMocks @InjectMocks
private ReindexStep myReindexStep; private ReindexStepV1 myReindexStepV1;
@Captor @Captor
private ArgumentCaptor<HapiTransactionService.ExecutionBuilder> builderArgumentCaptor; private ArgumentCaptor<HapiTransactionService.ExecutionBuilder> builderArgumentCaptor;
@ -51,7 +51,7 @@ public class ReindexStepTest {
when(myHapiTransactionService.buildExecutionBuilder(any())).thenCallRealMethod(); when(myHapiTransactionService.buildExecutionBuilder(any())).thenCallRealMethod();
// when // when
myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", reindexJobParameters); myReindexStepV1.doReindex(data, myDataSink, "index-id", "chunk-id", reindexJobParameters);
// then // then
assertMethodArgumentRequestPartitionId(expectedPartitionId); assertMethodArgumentRequestPartitionId(expectedPartitionId);

View File

@ -1,8 +1,5 @@
package ca.uhn.fhir.jpa.provider.r4; package ca.uhn.fhir.jpa.provider.r4;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.i18n.Msg;
@ -60,9 +57,11 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@ -244,7 +243,7 @@ public class ResourceProviderCustomSearchParamR4Test extends BaseResourceProvide
runInTransaction(() -> { runInTransaction(() -> {
myBatch2JobHelper.forceRunMaintenancePass(); myBatch2JobHelper.forceRunMaintenancePass();
List<JobInstance> allJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX); List<JobInstance> allJobs = myBatch2JobHelper.findJobsByDefinition(JOB_REINDEX);
assertEquals(1, allJobs.size()); assertEquals(1, allJobs.size());
assertEquals(1, allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().size()); assertEquals(1, allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().size());
assertEquals("Patient?", allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().get(0).getUrl()); assertEquals("Patient?", allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().get(0).getUrl());

View File

@ -2,10 +2,13 @@ package ca.uhn.fhir.jpa.reindex;
import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep; import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexStepV1;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test; import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
@ -25,13 +28,14 @@ import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXED;
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED; import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
public class ReindexStepTest extends BaseJpaR4Test { public class ReindexStepV1Test extends BaseJpaR4Test {
@Autowired @Autowired
private ReindexStep myReindexStep; private ReindexStepV1 myReindexStepV1;
@Mock @Mock
private IJobDataSink<VoidModel> myDataSink; private IJobDataSink<VoidModel> myDataSink;
@ -46,9 +50,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
@Test @Test
public void testReindex_NoActionNeeded() { public void testReindex_NoActionNeeded() {
// Setup // Setup
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong(); Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong(); Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong();
@ -57,9 +59,19 @@ public class ReindexStepTest extends BaseJpaR4Test {
data.addTypedPid("Patient", id1); data.addTypedPid("Patient", id1);
// Execute // Execute
ReindexJobParameters params = new ReindexJobParameters();
myCaptureQueriesListener.clear(); myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); JobInstance instance = new JobInstance();
instance.setInstanceId("index-id");
WorkChunk chunk = new WorkChunk();
chunk.setId("chunk-id");
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
params,
data,
instance,
chunk
);
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink);
// Verify // Verify
assertEquals(2, outcome.getRecordsProcessed()); assertEquals(2, outcome.getRecordsProcessed());
@ -72,12 +84,9 @@ public class ReindexStepTest extends BaseJpaR4Test {
assertEquals(0, myCaptureQueriesListener.getRollbackCount()); assertEquals(0, myCaptureQueriesListener.getRollbackCount());
} }
@Test @Test
public void testReindex_NoActionNeeded_IndexMissingFieldsEnabled() { public void testReindex_NoActionNeeded_IndexMissingFieldsEnabled() {
// Setup // Setup
myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED); myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED);
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong(); Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
@ -88,9 +97,16 @@ public class ReindexStepTest extends BaseJpaR4Test {
data.addTypedPid("Patient", id1); data.addTypedPid("Patient", id1);
// Execute // Execute
ReindexJobParameters params = new ReindexJobParameters();
myCaptureQueriesListener.clear(); myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); JobInstance instance = new JobInstance();
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
params,
data,
instance,
mock(WorkChunk.class)
);
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink);
// Verify // Verify
assertEquals(2, outcome.getRecordsProcessed()); assertEquals(2, outcome.getRecordsProcessed());
@ -121,9 +137,16 @@ public class ReindexStepTest extends BaseJpaR4Test {
}); });
// Execute // Execute
ReindexJobParameters params = new ReindexJobParameters();
myCaptureQueriesListener.clear(); myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); JobInstance instance = new JobInstance();
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
params,
data,
instance,
mock(WorkChunk.class)
);
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink);
// Verify // Verify
assertEquals(2, outcome.getRecordsProcessed()); assertEquals(2, outcome.getRecordsProcessed());
@ -136,12 +159,9 @@ public class ReindexStepTest extends BaseJpaR4Test {
assertEquals(0, myCaptureQueriesListener.getRollbackCount()); assertEquals(0, myCaptureQueriesListener.getRollbackCount());
} }
@Test @Test
public void testReindex_IndexesAddedAndRemoved_IndexMissingFieldsEnabled() { public void testReindex_IndexesAddedAndRemoved_IndexMissingFieldsEnabled() {
// Setup // Setup
myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED); myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED);
boolean markResourcesForReindexingUponSearchParameterChange = myStorageSettings.isMarkResourcesForReindexingUponSearchParameterChange(); boolean markResourcesForReindexingUponSearchParameterChange = myStorageSettings.isMarkResourcesForReindexingUponSearchParameterChange();
myStorageSettings.setMarkResourcesForReindexingUponSearchParameterChange(false); // if this were true, it would set up a lot of reindex jobs extraneous to the one we're trying to test myStorageSettings.setMarkResourcesForReindexingUponSearchParameterChange(false); // if this were true, it would set up a lot of reindex jobs extraneous to the one we're trying to test
@ -189,9 +209,16 @@ public class ReindexStepTest extends BaseJpaR4Test {
mySearchParamRegistry.forceRefresh(); mySearchParamRegistry.forceRefresh();
// Execute // Execute
ReindexJobParameters params = new ReindexJobParameters();
myCaptureQueriesListener.clear(); myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); JobInstance instance = new JobInstance();
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
params,
data,
instance,
mock(WorkChunk.class)
);
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink);
// Verify // Verify
assertEquals(2, outcome.getRecordsProcessed()); assertEquals(2, outcome.getRecordsProcessed());
@ -207,9 +234,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
@Test @Test
public void testReindex_OneResourceReindexFailedButOthersSucceeded() { public void testReindex_OneResourceReindexFailedButOthersSucceeded() {
// Setup // Setup
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong(); Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong(); Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong();
Long idPatientToInvalidate = createPatient().getIdPartAsLong(); Long idPatientToInvalidate = createPatient().getIdPartAsLong();
@ -234,9 +259,19 @@ public class ReindexStepTest extends BaseJpaR4Test {
}); });
// Execute // Execute
ReindexJobParameters params = new ReindexJobParameters();
myCaptureQueriesListener.clear(); myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters()); JobInstance instance = new JobInstance();
instance.setInstanceId("index-id");
WorkChunk workChunk = new WorkChunk();
workChunk.setId("workid");
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
params,
data,
instance,
workChunk
);
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink);
// Verify // Verify
assertEquals(4, outcome.getRecordsProcessed()); assertEquals(4, outcome.getRecordsProcessed());

View File

@ -2,7 +2,6 @@ package ca.uhn.fhir.jpa.reindex;
import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
@ -39,6 +38,7 @@ import java.util.Date;
import java.util.List; import java.util.List;
import java.util.stream.Stream; import java.util.stream.Stream;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -47,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@SuppressWarnings("SqlDialectInspection") @SuppressWarnings("SqlDialectInspection")
public class ReindexJobTest extends BaseJpaR4Test { public class ReindexTaskTest extends BaseJpaR4Test {
@Autowired @Autowired
private IJobCoordinator myJobCoordinator; private IJobCoordinator myJobCoordinator;
@ -101,7 +101,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
// execute // execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters( startRequest.setParameters(
new ReindexJobParameters() new ReindexJobParameters()
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION) .setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION)
@ -158,7 +158,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
// execute // execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters( startRequest.setParameters(
new ReindexJobParameters() new ReindexJobParameters()
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS) .setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS)
@ -217,7 +217,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
// execute // execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters( startRequest.setParameters(
new ReindexJobParameters() new ReindexJobParameters()
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS) .setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS)
@ -252,7 +252,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
// execute // execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters( startRequest.setParameters(
new ReindexJobParameters() new ReindexJobParameters()
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION) .setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION)
@ -294,7 +294,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
// execute // execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(parameters); startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest); Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
myBatch2JobHelper.awaitJobCompletion(res); myBatch2JobHelper.awaitJobCompletion(res);
@ -325,7 +325,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
// execute // execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(parameters); startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(startRequest); Batch2JobStartResponse res = myJobCoordinator.startInstance(startRequest);
JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res); JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res);
@ -356,7 +356,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
parameters.addUrl("Observation?status=final"); parameters.addUrl("Observation?status=final");
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(parameters); startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest); Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
myBatch2JobHelper.awaitJobCompletion(res); myBatch2JobHelper.awaitJobCompletion(res);
@ -387,7 +387,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
// execute // execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(new ReindexJobParameters()); startRequest.setParameters(new ReindexJobParameters());
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(mySrd, startRequest); Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(mySrd, startRequest);
myBatch2JobHelper.awaitJobCompletion(startResponse); myBatch2JobHelper.awaitJobCompletion(startResponse);
@ -406,7 +406,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
DaoMethodOutcome searchParameter = myReindexTestHelper.createUniqueCodeSearchParameter(); DaoMethodOutcome searchParameter = myReindexTestHelper.createUniqueCodeSearchParameter();
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(new ReindexJobParameters()); startRequest.setParameters(new ReindexJobParameters());
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest); Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse); JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse);
@ -436,7 +436,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
// Run a reindex // Run a reindex
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(new ReindexJobParameters()); startRequest.setParameters(new ReindexJobParameters());
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest); Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId(), 999); JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId(), 999);
@ -469,7 +469,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
// Run a reindex // Run a reindex
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(new ReindexJobParameters()); startRequest.setParameters(new ReindexJobParameters());
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest); Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId(), 999); JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId(), 999);
@ -500,7 +500,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
// execute // execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(new ReindexJobParameters()); startRequest.setParameters(new ReindexJobParameters());
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(mySrd, startRequest); Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(mySrd, startRequest);
JobInstance outcome = myBatch2JobHelper.awaitJobCompletion(startResponse); JobInstance outcome = myBatch2JobHelper.awaitJobCompletion(startResponse);
@ -528,7 +528,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
// execute // execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(new ReindexJobParameters()); startRequest.setParameters(new ReindexJobParameters());
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest); Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
JobInstance outcome = myBatch2JobHelper.awaitJobFailure(startResponse); JobInstance outcome = myBatch2JobHelper.awaitJobFailure(startResponse);
@ -541,7 +541,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
@Test @Test
public void testReindex_withReindexingUponSearchParameterChangeEnabled_reindexJobCompleted() { public void testReindex_withReindexingUponSearchParameterChangeEnabled_reindexJobCompleted() {
List<JobInstance> jobInstances = myJobPersistence.fetchInstancesByJobDefinitionId(ReindexAppCtx.JOB_REINDEX, 10, 0); List<JobInstance> jobInstances = myJobPersistence.fetchInstancesByJobDefinitionId(JOB_REINDEX, 10, 0);
assertThat(jobInstances).isEmpty(); assertThat(jobInstances).isEmpty();
// make sure the resources auto-reindex after the search parameter update is enabled // make sure the resources auto-reindex after the search parameter update is enabled
@ -552,7 +552,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
myReindexTestHelper.createCodeSearchParameter(); myReindexTestHelper.createCodeSearchParameter();
// check that reindex job was created // check that reindex job was created
jobInstances = myJobPersistence.fetchInstancesByJobDefinitionId(ReindexAppCtx.JOB_REINDEX, 10, 0); jobInstances = myJobPersistence.fetchInstancesByJobDefinitionId(JOB_REINDEX, 10, 0);
assertThat(jobInstances).hasSize(1); assertThat(jobInstances).hasSize(1);
// check that the job is completed (not stuck in QUEUED status) // check that the job is completed (not stuck in QUEUED status)

View File

@ -1,9 +1,8 @@
package ca.uhn.fhir.jpa.reindex; package ca.uhn.fhir.jpa.reindex;
import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx; import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.interceptor.model.RequestPartitionId;
@ -25,9 +24,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import java.util.List; import java.util.List;
import java.util.stream.Stream; import java.util.stream.Stream;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@TestInstance(TestInstance.Lifecycle.PER_CLASS) @TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ReindexJobWithPartitioningTest extends BaseJpaR4Test { public class ReindexTaskWithPartitioningTest extends BaseJpaR4Test {
@Autowired @Autowired
private IJobCoordinator myJobCoordinator; private IJobCoordinator myJobCoordinator;
@ -133,7 +133,7 @@ public class ReindexJobWithPartitioningTest extends BaseJpaR4Test {
// execute // execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(parameters); startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest); Batch2JobStartResponse res = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res); JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(res);

View File

@ -1,5 +1,8 @@
package ca.uhn.fhir.jpa.term; package ca.uhn.fhir.jpa.term;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.context.support.ConceptValidationOptions; import ca.uhn.fhir.context.support.ConceptValidationOptions;
@ -9,6 +12,7 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoValueSet;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.entity.TermCodeSystem; import ca.uhn.fhir.jpa.entity.TermCodeSystem;
import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion; import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion;
import ca.uhn.fhir.jpa.entity.TermConcept; import ca.uhn.fhir.jpa.entity.TermConcept;
@ -25,7 +29,10 @@ import ca.uhn.fhir.jpa.term.api.ITermReadSvc;
import ca.uhn.fhir.jpa.term.custom.CustomTerminologySet; import ca.uhn.fhir.jpa.term.custom.CustomTerminologySet;
import ca.uhn.fhir.jpa.util.SqlQuery; import ca.uhn.fhir.jpa.util.SqlQuery;
import ca.uhn.fhir.jpa.util.ValueSetTestUtil; import ca.uhn.fhir.jpa.util.ValueSetTestUtil;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -51,10 +58,16 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static ca.uhn.fhir.util.HapiExtensions.EXT_VALUESET_EXPANSION_MESSAGE; import static ca.uhn.fhir.util.HapiExtensions.EXT_VALUESET_EXPANSION_MESSAGE;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.await;
@ -2081,7 +2094,172 @@ public class ValueSetExpansionR4Test extends BaseTermR4Test implements IValueSet
outcome = myValueSetDao.validateCode(vs.getUrlElement(), null, new StringType("A"), cs.getUrlElement(), null, null, null, mySrd); outcome = myValueSetDao.validateCode(vs.getUrlElement(), null, new StringType("A"), cs.getUrlElement(), null, null, null, mySrd);
assertEquals(false, outcome.isOk()); assertEquals(false, outcome.isOk());
assertThat(outcome.getMessage()).contains("Code validation occurred using a ValueSet expansion that was pre-calculated"); assertThat(outcome.getMessage()).contains("Code validation occurred using a ValueSet expansion that was pre-calculated");
} }
@Test
public void reindexCodeSystems_withDeferredCodeSystems_reindexesAllCodeSystems() {
// setup
int deferredIndexingDefault = myStorageSettings.getDeferIndexingForCodesystemsOfSize();
try {
/**
* The deferred count must be less than the number of
* concepts we are going to be uploading.
* That way, when we do the reindex, it will defer
* the additional code systems for a later job run.
*
* See {@link TermCodeSystemStorageSvcImpl#addConceptInHierarchy(TermCodeSystemVersion, Collection, TermConcept, UploadStatistics, Map, int)}
*
* Our CodeSystem below only has 6 Concepts to add.
* So we'll set the deferred count to 3 (so 3 will be deferred)
*/
myStorageSettings.setDeferIndexingForCodesystemsOfSize(3);
/*
* We're also setting our retry delay to a short timeframe
* so this test doesn't run too long.
*/
ReindexUtils.setRetryDelay(Duration.of(300, ChronoUnit.MILLIS));
IParser parser = myFhirContext.newJsonParser();
RequestDetails rq = new SystemRequestDetails();
CodeSystem cs;
ValueSet vs;
String csStr;
{
String vsStr = """
{
"resourceType": "ValueSet",
"id": "0447bffa-01fa-4405-828a-96192e74a5d8",
"meta": {
"versionId": "2",
"lastUpdated": "2024-04-09T15:06:24.025+00:00",
"source": "#f4491e490a6a2900"
},
"url": "https://health.gov.on.ca/idms/fhir/ValueSet/IDMS-Submission-Types",
"version": "1.0.0",
"name": "IDMS-SUBMISSION-TYPES",
"title": "IDMS Submission Types",
"status": "active",
"experimental": false,
"date": "2023-09-28",
"publisher": "IDMS",
"description": "List of Submission Types",
"compose": {
"include": [
{
"system": "https://health.gov.on.ca/idms/fhir/CodeSystem/Internal-Submission-Types"
}
]
}
}
""";
vs = parser.parseResource(ValueSet.class, vsStr);
csStr = """
{
"resourceType": "CodeSystem",
"id": "d9acd5b8-9533-4fa1-bb70-b4380957a8c3",
"meta": {
"versionId": "14",
"lastUpdated": "2024-06-03T17:49:56.580+00:00",
"source": "#261a82258b0978a8"
},
"url": "https://health.gov.on.ca/idms/fhir/CodeSystem/Internal-Submission-Types",
"version": "1.0.0",
"name": "IDMS-Internal-Submission-Types",
"status": "active",
"date": "2023-09-07",
"publisher": "IDMS",
"description": "This contains a lists of codes Submission Type Codes.",
"content": "complete",
"concept": [
{
"code": "SUB-BRAND-PRODUCT",
"display": "New Brand Product (Non-New Chemical Entity)"
},
{
"code": "SUB-CLASSIFICATION",
"display": "Classification Change"
},
{
"code": "SUB-CLINICIAN-LED-SUBMISSIONS",
"display": "Clinician-Led Submissions"
},
{
"code": "SUB-DELISTING",
"display": "Delisting"
},
{
"code": "SUB-DIN-CHANGE",
"display": "Drug Identification Number (DIN) Change"
},
{
"code": "SUB-DISTRIBUTOR",
"display": "Distributor Change"
}
]
}
""";
cs = parser.parseResource(CodeSystem.class, csStr);
}
// create our ValueSet
myValueSetDao.update(vs, rq);
// and the code system
myCodeSystemDao.update(cs, rq);
// sanity check to make sure our code system was actually created
SearchParameterMap spMap = new SearchParameterMap();
spMap.setLoadSynchronous(true);
IBundleProvider bp = myCodeSystemDao.search(spMap, rq);
assertEquals(1, bp.getAllResources().size());
IBaseResource baseResource = bp.getAllResources().get(0);
CodeSystem cssaved;
if (baseResource instanceof CodeSystem saved) {
cssaved = saved;
} else {
fail("Should be a code system");
return;
}
assertEquals(cs.getConcept().size(), cssaved.getConcept().size());
// test
// perform the reindex (we'll only target the CodeSystem here)
ReindexJobParameters params = new ReindexJobParameters();
params.addUrl("CodeSystem?");
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(params);
// and wait for it to complete
Batch2JobStartResponse response = myJobCoordinator.startInstance(rq, startRequest);
myBatch2JobHelper.awaitJobCompletion(response);
// verify by doing the value expansion
ValueSetExpansionOptions options = new ValueSetExpansionOptions();
options.setCount(200); // this is way more than exist, so it's ok
ValueSet expanded = myValueSetDao.expand(vs, options);
assertNotNull(expanded);
/*
* If the reindex was performed correctly, the expanded ValueSet
* should contain all the CodeSystem concepts that we originally
* uploaded (and nothing else).
*/
HashSet<String> all = new HashSet<>();
for (CodeSystem.ConceptDefinitionComponent set : cs.getConcept()) {
all.add(set.getCode());
}
for (ValueSet.ValueSetExpansionContainsComponent v : expanded.getExpansion().getContains()) {
all.remove(v.getCode());
}
assertTrue(all.isEmpty(), String.join(", ", all));
assertEquals(cs.getConcept().size(), expanded.getExpansion().getTotal());
} finally {
// set back to standard values
myStorageSettings.setDeferIndexingForCodesystemsOfSize(deferredIndexingDefault);
ReindexUtils.setRetryDelay(null);
}
}
} }

View File

@ -1,6 +1,5 @@
package ca.uhn.fhir.jpa.dao.r5; package ca.uhn.fhir.jpa.dao.r5;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
@ -19,6 +18,7 @@ import org.hl7.fhir.r5.model.Reference;
import org.hl7.fhir.r5.model.SearchParameter; import org.hl7.fhir.r5.model.SearchParameter;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
public class DuplicateIndexR5Test extends BaseJpaR5Test { public class DuplicateIndexR5Test extends BaseJpaR5Test {
@ -149,7 +149,7 @@ public class DuplicateIndexR5Test extends BaseJpaR5Test {
ReindexJobParameters parameters = new ReindexJobParameters(); ReindexJobParameters parameters = new ReindexJobParameters();
parameters.addUrl("Patient?"); parameters.addUrl("Patient?");
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(parameters); startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest); Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
myBatch2JobHelper.awaitJobCompletion(res.getInstanceId()); myBatch2JobHelper.awaitJobCompletion(res.getInstanceId());

View File

@ -1,7 +1,6 @@
package ca.uhn.fhir.jpa.provider.r5; package ca.uhn.fhir.jpa.provider.r5;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.interceptor.model.RequestPartitionId;
@ -67,6 +66,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static org.apache.commons.lang3.StringUtils.leftPad; import static org.apache.commons.lang3.StringUtils.leftPad;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -374,7 +374,7 @@ public class ResourceProviderR5Test extends BaseResourceProviderR5Test {
ReindexJobParameters jobParameters = new ReindexJobParameters(); ReindexJobParameters jobParameters = new ReindexJobParameters();
jobParameters.addPartitionedUrl(new PartitionedUrl().setRequestPartitionId(RequestPartitionId.allPartitions())); jobParameters.addPartitionedUrl(new PartitionedUrl().setRequestPartitionId(RequestPartitionId.allPartitions()));
JobInstanceStartRequest request = new JobInstanceStartRequest(); JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); request.setJobDefinitionId(JOB_REINDEX);
request.setParameters(jobParameters); request.setParameters(jobParameters);
Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request); Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request);

View File

@ -19,6 +19,7 @@
*/ */
package ca.uhn.fhir.jpa.test; package ca.uhn.fhir.jpa.test;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService; import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.jobs.export.BulkDataExportProvider; import ca.uhn.fhir.batch2.jobs.export.BulkDataExportProvider;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
@ -559,6 +560,8 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil
@Autowired @Autowired
protected IJobMaintenanceService myJobMaintenanceService; protected IJobMaintenanceService myJobMaintenanceService;
@Autowired
protected IJobCoordinator myJobCoordinator;
@RegisterExtension @RegisterExtension
private final PreventDanglingInterceptorsExtension myPreventDanglingInterceptorsExtension = new PreventDanglingInterceptorsExtension(()-> myInterceptorRegistry); private final PreventDanglingInterceptorsExtension myPreventDanglingInterceptorsExtension = new PreventDanglingInterceptorsExtension(()-> myInterceptorRegistry);

View File

@ -38,6 +38,7 @@ import org.junit.jupiter.params.provider.Arguments;
import java.util.List; import java.util.List;
import java.util.stream.Stream; import java.util.stream.Stream;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@ -170,7 +171,7 @@ public class PatientReindexTestHelper {
private JobInstanceStartRequest createPatientReindexRequest(int theBatchSize) { private JobInstanceStartRequest createPatientReindexRequest(int theBatchSize) {
JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); startRequest.setJobDefinitionId(JOB_REINDEX);
ReindexJobParameters reindexJobParameters = new ReindexJobParameters(); ReindexJobParameters reindexJobParameters = new ReindexJobParameters();
reindexJobParameters.setBatchSize(Math.max(theBatchSize,1)); reindexJobParameters.setBatchSize(Math.max(theBatchSize,1));

View File

@ -17,11 +17,15 @@ import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_DELETE_JOB_NAME;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same; import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -67,8 +71,15 @@ public class TermDeferredStorageSvcImplTest {
ReflectionTestUtils.setField(mySvc, "myJobExecutions", mockExecutions); ReflectionTestUtils.setField(mySvc, "myJobExecutions", mockExecutions);
when(myJobCoordinator.getInstance(eq(jobId))) when(myJobCoordinator.getInstancesbyJobDefinitionIdAndEndedStatus(
.thenReturn(instance); eq(TERM_CODE_SYSTEM_DELETE_JOB_NAME),
eq(true),
anyInt(),
eq(0)
))
.thenReturn(List.of()) // first nothing
.thenReturn(List.of(instance)); // then the list with the instance
assertFalse(mySvc.isStorageQueueEmpty(true)); assertFalse(mySvc.isStorageQueueEmpty(true));
instance.setStatus(StatusEnum.COMPLETED); instance.setStatus(StatusEnum.COMPLETED);
assertTrue(mySvc.isStorageQueueEmpty(true)); assertTrue(mySvc.isStorageQueueEmpty(true));

View File

@ -21,69 +21,37 @@ package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider; import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.api.IJobStepWorker; import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexV1Config;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; import ca.uhn.fhir.batch2.jobs.reindex.v2.ReindexV2Config;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.rest.server.provider.ProviderConstants; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@Configuration @Configuration
@Import({ReindexV1Config.class, ReindexV2Config.class})
public class ReindexAppCtx { public class ReindexAppCtx {
public static final String JOB_REINDEX = "REINDEX"; @Autowired
private HapiTransactionService myHapiTransactionService;
@Bean @Autowired
public JobDefinition<ReindexJobParameters> reindexJobDefinition(IBatch2DaoSvc theBatch2DaoSvc) { private IFhirSystemDao<?, ?> mySystemDao;
return JobDefinition.newBuilder()
.setJobDefinitionId(JOB_REINDEX)
.setJobDescription("Reindex resources")
.setJobDefinitionVersion(1)
.setParametersType(ReindexJobParameters.class)
.setParametersValidator(reindexJobParametersValidator(theBatch2DaoSvc))
.gatedExecution()
.addFirstStep(
"generate-ranges",
"Generate data ranges to reindex",
ChunkRangeJson.class,
reindexGenerateRangeChunksStep())
.addIntermediateStep(
"load-ids",
"Load IDs of resources to reindex",
ResourceIdListWorkChunkJson.class,
reindexLoadIdsStep(theBatch2DaoSvc))
.addLastStep("reindex", "Perform the resource reindex", reindexStep())
.build();
}
@Bean @Autowired
public IJobStepWorker<ReindexJobParameters, VoidModel, ChunkRangeJson> reindexGenerateRangeChunksStep() { private DaoRegistry myRegistry;
return new GenerateRangeChunksStep<>();
}
@Bean @Autowired
public IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> reindexLoadIdsStep( private IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
IBatch2DaoSvc theBatch2DaoSvc) {
return new LoadIdsStep<>(theBatch2DaoSvc);
}
@Bean /* Shared services */
public ReindexJobParametersValidator reindexJobParametersValidator(IBatch2DaoSvc theBatch2DaoSvc) {
return new ReindexJobParametersValidator(
new UrlListValidator(ProviderConstants.OPERATION_REINDEX, theBatch2DaoSvc));
}
@Bean
public ReindexStep reindexStep() {
return new ReindexStep();
}
@Bean @Bean
public ReindexProvider reindexProvider( public ReindexProvider reindexProvider(
@ -92,4 +60,9 @@ public class ReindexAppCtx {
IJobPartitionProvider theJobPartitionHandler) { IJobPartitionProvider theJobPartitionHandler) {
return new ReindexProvider(theFhirContext, theJobCoordinator, theJobPartitionHandler); return new ReindexProvider(theFhirContext, theJobCoordinator, theJobPartitionHandler);
} }
@Bean
public ReindexJobService jobService() {
return new ReindexJobService(myRegistry);
}
} }

View File

@ -44,6 +44,7 @@ import java.util.stream.Collectors;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.OPTIMIZE_STORAGE; import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.OPTIMIZE_STORAGE;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.REINDEX_SEARCH_PARAMETERS; import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.REINDEX_SEARCH_PARAMETERS;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
public class ReindexProvider { public class ReindexProvider {
@ -127,7 +128,7 @@ public class ReindexProvider {
myJobPartitionProvider.getPartitionedUrls(theRequestDetails, urls).forEach(params::addPartitionedUrl); myJobPartitionProvider.getPartitionedUrls(theRequestDetails, urls).forEach(params::addPartitionedUrl);
JobInstanceStartRequest request = new JobInstanceStartRequest(); JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); request.setJobDefinitionId(JOB_REINDEX);
request.setParameters(params); request.setParameters(params);
Batch2JobStartResponse response = myJobCoordinator.startInstance(theRequestDetails, request); Batch2JobStartResponse response = myJobCoordinator.startInstance(theRequestDetails, request);

View File

@ -1,203 +0,0 @@
/*-
* #%L
* hapi-fhir-storage-batch2-jobs
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.dao.ReindexOutcome;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
import ca.uhn.fhir.util.StopWatch;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ReindexStep implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
public static final int REINDEX_MAX_RETRIES = 10;
private static final Logger ourLog = LoggerFactory.getLogger(ReindexStep.class);
@Autowired
private HapiTransactionService myHapiTransactionService;
@Autowired
private IFhirSystemDao<?, ?> mySystemDao;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private IIdHelperService<IResourcePersistentId> myIdHelperService;
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails,
@Nonnull IJobDataSink<VoidModel> theDataSink)
throws JobExecutionFailedException {
ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData();
ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters();
return doReindex(
data,
theDataSink,
theStepExecutionDetails.getInstance().getInstanceId(),
theStepExecutionDetails.getChunkId(),
jobParameters);
}
@Nonnull
public RunOutcome doReindex(
ResourceIdListWorkChunkJson data,
IJobDataSink<VoidModel> theDataSink,
String theInstanceId,
String theChunkId,
ReindexJobParameters theJobParameters) {
RequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRetry(true);
requestDetails.setMaxRetries(REINDEX_MAX_RETRIES);
TransactionDetails transactionDetails = new TransactionDetails();
ReindexJob reindexJob = new ReindexJob(
data, requestDetails, transactionDetails, theDataSink, theInstanceId, theChunkId, theJobParameters);
myHapiTransactionService
.withRequest(requestDetails)
.withTransactionDetails(transactionDetails)
.withRequestPartitionId(data.getRequestPartitionId())
.execute(reindexJob);
return new RunOutcome(data.size());
}
private class ReindexJob implements TransactionCallback<Void> {
private final ResourceIdListWorkChunkJson myData;
private final RequestDetails myRequestDetails;
private final TransactionDetails myTransactionDetails;
private final IJobDataSink<VoidModel> myDataSink;
private final String myChunkId;
private final String myInstanceId;
private final ReindexJobParameters myJobParameters;
public ReindexJob(
ResourceIdListWorkChunkJson theData,
RequestDetails theRequestDetails,
TransactionDetails theTransactionDetails,
IJobDataSink<VoidModel> theDataSink,
String theInstanceId,
String theChunkId,
ReindexJobParameters theJobParameters) {
myData = theData;
myRequestDetails = theRequestDetails;
myTransactionDetails = theTransactionDetails;
myDataSink = theDataSink;
myInstanceId = theInstanceId;
myChunkId = theChunkId;
myJobParameters = theJobParameters;
myDataSink.setWarningProcessor(new ReindexWarningProcessor());
}
@Override
public Void doInTransaction(@Nonnull TransactionStatus theStatus) {
List<IResourcePersistentId> persistentIds = myData.getResourcePersistentIds(myIdHelperService);
ourLog.info(
"Starting reindex work chunk with {} resources - Instance[{}] Chunk[{}]",
persistentIds.size(),
myInstanceId,
myChunkId);
StopWatch sw = new StopWatch();
// Prefetch Resources from DB
boolean reindexSearchParameters =
myJobParameters.getReindexSearchParameters() != ReindexParameters.ReindexSearchParametersEnum.NONE;
mySystemDao.preFetchResources(persistentIds, reindexSearchParameters);
ourLog.info(
"Prefetched {} resources in {} - Instance[{}] Chunk[{}]",
persistentIds.size(),
sw,
myInstanceId,
myChunkId);
ReindexParameters parameters = new ReindexParameters()
.setReindexSearchParameters(myJobParameters.getReindexSearchParameters())
.setOptimizeStorage(myJobParameters.getOptimizeStorage())
.setOptimisticLock(myJobParameters.getOptimisticLock());
// Reindex
sw.restart();
for (int i = 0; i < myData.size(); i++) {
String nextResourceType = myData.getResourceType(i);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(nextResourceType);
IResourcePersistentId<?> resourcePersistentId = persistentIds.get(i);
try {
ReindexOutcome outcome =
dao.reindex(resourcePersistentId, parameters, myRequestDetails, myTransactionDetails);
outcome.getWarnings().forEach(myDataSink::recoveredError);
} catch (BaseServerResponseException | DataFormatException e) {
String resourceForcedId = myIdHelperService
.translatePidIdToForcedIdWithCache(resourcePersistentId)
.orElse(resourcePersistentId.toString());
String resourceId = nextResourceType + "/" + resourceForcedId;
ourLog.debug("Failure during reindexing {}", resourceId, e);
myDataSink.recoveredError("Failure reindexing " + resourceId + ": " + e.getMessage());
}
}
ourLog.info(
"Finished reindexing {} resources in {} - {}/sec - Instance[{}] Chunk[{}]",
persistentIds.size(),
sw,
sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS),
myInstanceId,
myChunkId);
return null;
}
}
}

View File

@ -0,0 +1,39 @@
package ca.uhn.fhir.batch2.jobs.reindex;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
public class ReindexUtils {
/**
* The reindex job definition id
*/
public static final String JOB_REINDEX = "REINDEX";
public static final int REINDEX_MAX_RETRIES = 10;
private static final Duration RETRY_DELAY = Duration.of(30, ChronoUnit.SECONDS);
private static Duration myDelay;
/**
* Returns the retry delay for reindex jobs that require polling.
*/
public static Duration getRetryLaterDelay() {
if (myDelay != null) {
return myDelay;
}
return RETRY_DELAY;
}
/**
* Sets the retry delay to use for reindex jobs.
* Do not use this in production code! Only test code.
*/
@VisibleForTesting
public static void setRetryDelay(Duration theDuration) {
myDelay = theDuration;
}
}

View File

@ -0,0 +1,29 @@
package ca.uhn.fhir.batch2.jobs.reindex.models;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.HashMap;
public class ReindexResults implements IModelJson {
/**
* A map of resource type : whether or not the reindex is completed;
* true = more work needed. false (or omitted) = reindex is done
*/
@JsonProperty("resource2NeedsWork")
private HashMap<String, Boolean> myResourceToHasWorkToComplete;
public ReindexResults() {}
public HashMap<String, Boolean> getResourceToHasWorkToComplete() {
if (myResourceToHasWorkToComplete == null) {
myResourceToHasWorkToComplete = new HashMap<>();
}
return myResourceToHasWorkToComplete;
}
public void addResourceTypeToCompletionStatus(String theResourceType, boolean theRequiresMoreWork) {
getResourceToHasWorkToComplete().put(theResourceType, theRequiresMoreWork);
}
}

View File

@ -0,0 +1,38 @@
package ca.uhn.fhir.batch2.jobs.reindex.svcs;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.ReindexJobStatus;
import java.util.Map;
public class ReindexJobService {
private final DaoRegistry myDaoRegistry;
public ReindexJobService(DaoRegistry theRegistry) {
myDaoRegistry = theRegistry;
}
/**
* Checks if any of the resource types in the map have any pending reindex work waiting.
* This will return true after the first such encounter, and only return false if no
* reindex work is required for any resource.
* @param theResourceTypesToCheckFlag map of resourceType:whether or not to check
* @return true if there's reindex work pending, false otherwise
*/
public boolean anyResourceHasPendingReindexWork(Map<String, Boolean> theResourceTypesToCheckFlag) {
for (String resourceType : theResourceTypesToCheckFlag.keySet()) {
boolean toCheck = theResourceTypesToCheckFlag.get(resourceType);
if (toCheck) {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType);
ReindexJobStatus status = dao.getReindexJobStatus();
if (status.isHasReindexWorkPending()) {
return true;
}
}
}
return false;
}
}

View File

@ -0,0 +1,57 @@
/*-
* #%L
* hapi-fhir-storage-batch2-jobs
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.reindex.v1;
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
import ca.uhn.fhir.batch2.jobs.parameters.IUrlListValidator;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
@Deprecated(forRemoval = true, since = "7.6.0")
public class ReindexJobParametersValidatorV1 implements IJobParametersValidator<ReindexJobParameters> {
private final IUrlListValidator myUrlListValidator;
public ReindexJobParametersValidatorV1(IUrlListValidator theUrlListValidator) {
myUrlListValidator = theUrlListValidator;
}
@Nullable
@Override
public List<String> validate(RequestDetails theRequestDetails, @Nonnull ReindexJobParameters theParameters) {
List<String> errors = myUrlListValidator.validateUrls(theParameters.getUrls());
if (errors == null || errors.isEmpty()) {
// only check if there's no other errors (new list to fix immutable issues)
errors = new ArrayList<>();
for (String url : theParameters.getUrls()) {
if (url.contains(" ") || url.contains("\n") || url.contains("\t")) {
errors.add("Invalid URL. URL cannot contain spaces : " + url);
}
}
}
return errors;
}
}

View File

@ -0,0 +1,117 @@
/*-
* #%L
* hapi-fhir-storage-batch2-jobs
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.reindex.v1;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.REINDEX_MAX_RETRIES;
@Deprecated(forRemoval = true, since = "7.6.0")
public class ReindexStepV1 implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
private static final Logger ourLog = LoggerFactory.getLogger(ReindexStepV1.class);
private final HapiTransactionService myHapiTransactionService;
private final IFhirSystemDao<?, ?> mySystemDao;
private final DaoRegistry myDaoRegistry;
private final IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
public ReindexStepV1(
HapiTransactionService theHapiTransactionService,
IFhirSystemDao<?, ?> theSystemDao,
DaoRegistry theRegistry,
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
myDaoRegistry = theRegistry;
myHapiTransactionService = theHapiTransactionService;
mySystemDao = theSystemDao;
myIdHelperService = theIdHelperService;
}
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails,
@Nonnull IJobDataSink<VoidModel> theDataSink)
throws JobExecutionFailedException {
ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData();
ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters();
doReindex(
data,
theDataSink,
theStepExecutionDetails.getInstance().getInstanceId(),
theStepExecutionDetails.getChunkId(),
jobParameters);
return new RunOutcome(data.size());
}
public ReindexResults doReindex(
ResourceIdListWorkChunkJson data,
IJobDataSink<?> theDataSink,
String theInstanceId,
String theChunkId,
ReindexJobParameters theJobParameters) {
RequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRetry(true);
requestDetails.setMaxRetries(REINDEX_MAX_RETRIES);
TransactionDetails transactionDetails = new TransactionDetails();
ReindexTaskV1.JobParameters jp = new ReindexTaskV1.JobParameters();
jp.setData(data)
.setRequestDetails(requestDetails)
.setTransactionDetails(transactionDetails)
.setDataSink(theDataSink)
.setInstanceId(theInstanceId)
.setChunkId(theChunkId)
.setJobParameters(theJobParameters);
ReindexTaskV1 reindexJob = new ReindexTaskV1(jp, myDaoRegistry, mySystemDao, myIdHelperService);
return myHapiTransactionService
.withRequest(requestDetails)
.withTransactionDetails(transactionDetails)
.withRequestPartitionId(data.getRequestPartitionId())
.execute(reindexJob);
}
}

View File

@ -0,0 +1,202 @@
package ca.uhn.fhir.batch2.jobs.reindex.v1;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexWarningProcessor;
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.dao.ReindexOutcome;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
import ca.uhn.fhir.util.StopWatch;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Deprecated(forRemoval = true, since = "7.6.0")
public class ReindexTaskV1 implements TransactionCallback<ReindexResults> {
private static final Logger ourLog = LoggerFactory.getLogger(ReindexTaskV1.class);
public static class JobParameters {
private ResourceIdListWorkChunkJson myData;
private RequestDetails myRequestDetails;
private TransactionDetails myTransactionDetails;
private IJobDataSink<?> myDataSink;
private String myChunkId;
private String myInstanceId;
private ReindexJobParameters myJobParameters;
public ResourceIdListWorkChunkJson getData() {
return myData;
}
public JobParameters setData(ResourceIdListWorkChunkJson theData) {
myData = theData;
return this;
}
public RequestDetails getRequestDetails() {
return myRequestDetails;
}
public JobParameters setRequestDetails(RequestDetails theRequestDetails) {
myRequestDetails = theRequestDetails;
return this;
}
public TransactionDetails getTransactionDetails() {
return myTransactionDetails;
}
public JobParameters setTransactionDetails(TransactionDetails theTransactionDetails) {
myTransactionDetails = theTransactionDetails;
return this;
}
public IJobDataSink<?> getDataSink() {
return myDataSink;
}
public JobParameters setDataSink(IJobDataSink<?> theDataSink) {
myDataSink = theDataSink;
return this;
}
public String getChunkId() {
return myChunkId;
}
public JobParameters setChunkId(String theChunkId) {
myChunkId = theChunkId;
return this;
}
public String getInstanceId() {
return myInstanceId;
}
public JobParameters setInstanceId(String theInstanceId) {
myInstanceId = theInstanceId;
return this;
}
public ReindexJobParameters getJobParameters() {
return myJobParameters;
}
public JobParameters setJobParameters(ReindexJobParameters theJobParameters) {
myJobParameters = theJobParameters;
return this;
}
}
private final DaoRegistry myDaoRegistry;
private final IFhirSystemDao<?, ?> mySystemDao;
private final IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
private final ResourceIdListWorkChunkJson myData;
private final RequestDetails myRequestDetails;
private final TransactionDetails myTransactionDetails;
private final IJobDataSink<?> myDataSink;
private final String myChunkId;
private final String myInstanceId;
private final ReindexJobParameters myJobParameters;
public ReindexTaskV1(
JobParameters theJobParameters,
DaoRegistry theRegistry,
IFhirSystemDao<?, ?> theSystemDao,
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
myDaoRegistry = theRegistry;
mySystemDao = theSystemDao;
myIdHelperService = theIdHelperService;
myData = theJobParameters.getData();
myRequestDetails = theJobParameters.getRequestDetails();
myTransactionDetails = theJobParameters.getTransactionDetails();
myDataSink = theJobParameters.getDataSink();
myInstanceId = theJobParameters.getInstanceId();
myChunkId = theJobParameters.getChunkId();
myJobParameters = theJobParameters.getJobParameters();
myDataSink.setWarningProcessor(new ReindexWarningProcessor());
}
@Override
public ReindexResults doInTransaction(@Nonnull TransactionStatus theStatus) {
List<IResourcePersistentId<?>> persistentIds = myData.getResourcePersistentIds(myIdHelperService);
ourLog.info(
"Starting reindex work chunk with {} resources - Instance[{}] Chunk[{}]",
persistentIds.size(),
myInstanceId,
myChunkId);
StopWatch sw = new StopWatch();
ReindexResults reindexResults = new ReindexResults();
// Prefetch Resources from DB
boolean reindexSearchParameters =
myJobParameters.getReindexSearchParameters() != ReindexParameters.ReindexSearchParametersEnum.NONE;
mySystemDao.preFetchResources(persistentIds, reindexSearchParameters);
ourLog.info(
"Prefetched {} resources in {} - Instance[{}] Chunk[{}]",
persistentIds.size(),
sw,
myInstanceId,
myChunkId);
ReindexParameters parameters = new ReindexParameters()
.setReindexSearchParameters(myJobParameters.getReindexSearchParameters())
.setOptimizeStorage(myJobParameters.getOptimizeStorage())
.setOptimisticLock(myJobParameters.getOptimisticLock());
// Reindex
sw.restart();
for (int i = 0; i < myData.size(); i++) {
String nextResourceType = myData.getResourceType(i);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(nextResourceType);
IResourcePersistentId<?> resourcePersistentId = persistentIds.get(i);
try {
ReindexOutcome outcome =
dao.reindex(resourcePersistentId, parameters, myRequestDetails, myTransactionDetails);
outcome.getWarnings().forEach(myDataSink::recoveredError);
reindexResults.addResourceTypeToCompletionStatus(nextResourceType, outcome.isHasPendingWork());
} catch (BaseServerResponseException | DataFormatException e) {
String resourceForcedId = myIdHelperService
.translatePidIdToForcedIdWithCache(resourcePersistentId)
.orElse(resourcePersistentId.toString());
String resourceId = nextResourceType + "/" + resourceForcedId;
ourLog.error("Failure during reindexing {}", resourceId, e);
myDataSink.recoveredError("Failure reindexing " + resourceId + ": " + e.getMessage());
}
}
ourLog.info(
"Finished reindexing {} resources in {} - {}/sec - Instance[{}] Chunk[{}]",
persistentIds.size(),
sw,
sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS),
myInstanceId,
myChunkId);
return reindexResults;
}
}

View File

@ -0,0 +1,101 @@
package ca.uhn.fhir.batch2.jobs.reindex.v1;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
@Deprecated(forRemoval = true, since = "7.6.0")
@Configuration
public class ReindexV1Config {
@Autowired
private ReindexJobService myReindexJobService;
@Autowired
private HapiTransactionService myHapiTransactionService;
@Autowired
private IFhirSystemDao<?, ?> mySystemDao;
@Autowired
private DaoRegistry myRegistry;
@Autowired
private IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
@Autowired
@Qualifier("reindexGenerateRangeChunkStepV1")
private IJobStepWorker<ReindexJobParameters, VoidModel, ChunkRangeJson> myReindexGenerateRangeChunkStep;
@Autowired
@Qualifier("reindexLoadIdsStepV1")
private IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> myReindexLoadIdsStep;
@Autowired
private ReindexJobParametersValidatorV1 myReindexJobParametersValidatorV1;
// Version 1
@Bean
public JobDefinition<ReindexJobParameters> reindexJobDefinitionV1() {
return JobDefinition.newBuilder()
.setJobDefinitionId(JOB_REINDEX)
.setJobDescription("Reindex resources")
.setJobDefinitionVersion(1)
.setParametersType(ReindexJobParameters.class)
.setParametersValidator(myReindexJobParametersValidatorV1)
.gatedExecution()
.addFirstStep(
"generate-ranges",
"Generate data ranges to reindex",
ChunkRangeJson.class,
myReindexGenerateRangeChunkStep)
.addIntermediateStep(
"load-ids",
"Load IDs of resources to reindex",
ResourceIdListWorkChunkJson.class,
myReindexLoadIdsStep)
.addLastStep("reindex-start", "Start the resource reindex", reindexStepV1())
.build();
}
@Bean
public ReindexStepV1 reindexStepV1() {
return new ReindexStepV1(myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService);
}
@Bean("reindexGenerateRangeChunkStepV1")
public IJobStepWorker<ReindexJobParameters, VoidModel, ChunkRangeJson> reindexGenerateRangeChunksStep() {
return new GenerateRangeChunksStep<>();
}
@Bean("reindexLoadIdsStepV1")
public IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> reindexLoadIdsStep(
IBatch2DaoSvc theBatch2DaoSvc) {
return new LoadIdsStep<>(theBatch2DaoSvc);
}
@Bean
public ReindexJobParametersValidatorV1 reindexJobParametersValidatorV1(IBatch2DaoSvc theBatch2DaoSvc) {
return new ReindexJobParametersValidatorV1(
new UrlListValidator(ProviderConstants.OPERATION_REINDEX, theBatch2DaoSvc));
}
}

View File

@ -0,0 +1,42 @@
package ca.uhn.fhir.batch2.jobs.reindex.v2;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RetryChunkLaterException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils;
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
import ca.uhn.fhir.i18n.Msg;
import jakarta.annotation.Nonnull;
public class CheckPendingReindexWorkStep implements IJobStepWorker<ReindexJobParameters, ReindexResults, VoidModel> {
private final ReindexJobService myReindexJobService;
public CheckPendingReindexWorkStep(ReindexJobService theReindexJobService) {
myReindexJobService = theReindexJobService;
}
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<ReindexJobParameters, ReindexResults> theStepExecutionDetails,
@Nonnull IJobDataSink<VoidModel> theDataSink)
throws JobExecutionFailedException {
ReindexResults results = theStepExecutionDetails.getData();
if (!results.getResourceToHasWorkToComplete().isEmpty()) {
if (myReindexJobService.anyResourceHasPendingReindexWork(results.getResourceToHasWorkToComplete())) {
throw new RetryChunkLaterException(Msg.code(2553), ReindexUtils.getRetryLaterDelay());
}
}
return RunOutcome.SUCCESS;
}
}

View File

@ -17,10 +17,11 @@
* limitations under the License. * limitations under the License.
* #L% * #L%
*/ */
package ca.uhn.fhir.batch2.jobs.reindex; package ca.uhn.fhir.batch2.jobs.reindex.v2;
import ca.uhn.fhir.batch2.api.IJobParametersValidator; import ca.uhn.fhir.batch2.api.IJobParametersValidator;
import ca.uhn.fhir.batch2.jobs.parameters.IUrlListValidator; import ca.uhn.fhir.batch2.jobs.parameters.IUrlListValidator;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import jakarta.annotation.Nonnull; import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
@ -28,11 +29,11 @@ import jakarta.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
public class ReindexJobParametersValidator implements IJobParametersValidator<ReindexJobParameters> { public class ReindexJobParametersValidatorV2 implements IJobParametersValidator<ReindexJobParameters> {
private final IUrlListValidator myUrlListValidator; private final IUrlListValidator myUrlListValidator;
public ReindexJobParametersValidator(IUrlListValidator theUrlListValidator) { public ReindexJobParametersValidatorV2(IUrlListValidator theUrlListValidator) {
myUrlListValidator = theUrlListValidator; myUrlListValidator = theUrlListValidator;
} }

View File

@ -0,0 +1,118 @@
package ca.uhn.fhir.batch2.jobs.reindex.v2;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RetryChunkLaterException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils;
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import jakarta.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.REINDEX_MAX_RETRIES;
public class ReindexStepV2
implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, ReindexResults> {
private final ReindexJobService myReindexJobService;
private final HapiTransactionService myHapiTransactionService;
private final IFhirSystemDao<?, ?> mySystemDao;
private final DaoRegistry myDaoRegistry;
private final IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
public ReindexStepV2(
ReindexJobService theJobService,
HapiTransactionService theHapiTransactionService,
IFhirSystemDao<?, ?> theSystemDao,
DaoRegistry theRegistry,
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
myDaoRegistry = theRegistry;
myHapiTransactionService = theHapiTransactionService;
mySystemDao = theSystemDao;
myIdHelperService = theIdHelperService;
myReindexJobService = theJobService;
}
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails,
@Nonnull IJobDataSink<ReindexResults> theDataSink)
throws JobExecutionFailedException {
ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData();
ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters();
// This is not strictly necessary;
// but we'll ensure that no outstanding "reindex work"
// is waiting to be completed, so that when we do
// our reindex work here, it won't skip over that data
Map<String, Boolean> resourceTypesToCheckFlag = new HashMap<>();
data.getTypedPids().forEach(id -> {
// we don't really care about duplicates; we check by resource type
resourceTypesToCheckFlag.put(id.getResourceType(), true);
});
if (myReindexJobService.anyResourceHasPendingReindexWork(resourceTypesToCheckFlag)) {
throw new RetryChunkLaterException(Msg.code(2552), ReindexUtils.getRetryLaterDelay());
}
ReindexResults results = doReindex(
data,
theDataSink,
theStepExecutionDetails.getInstance().getInstanceId(),
theStepExecutionDetails.getChunkId(),
jobParameters);
theDataSink.accept(results);
return new RunOutcome(data.size());
}
public ReindexResults doReindex(
ResourceIdListWorkChunkJson data,
IJobDataSink<?> theDataSink,
String theInstanceId,
String theChunkId,
ReindexJobParameters theJobParameters) {
RequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRetry(true);
requestDetails.setMaxRetries(REINDEX_MAX_RETRIES);
TransactionDetails transactionDetails = new TransactionDetails();
ReindexTaskV2.JobParameters jp = new ReindexTaskV2.JobParameters();
jp.setData(data)
.setRequestDetails(requestDetails)
.setTransactionDetails(transactionDetails)
.setDataSink(theDataSink)
.setInstanceId(theInstanceId)
.setChunkId(theChunkId)
.setJobParameters(theJobParameters);
ReindexTaskV2 reindexJob = new ReindexTaskV2(jp, myDaoRegistry, mySystemDao, myIdHelperService);
return myHapiTransactionService
.withRequest(requestDetails)
.withTransactionDetails(transactionDetails)
.withRequestPartitionId(data.getRequestPartitionId())
.execute(reindexJob);
}
}

View File

@ -0,0 +1,201 @@
package ca.uhn.fhir.batch2.jobs.reindex.v2;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexWarningProcessor;
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.dao.ReindexOutcome;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
import ca.uhn.fhir.util.StopWatch;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ReindexTaskV2 implements TransactionCallback<ReindexResults> {
private static final Logger ourLog = LoggerFactory.getLogger(ReindexTaskV2.class);
public static class JobParameters {
private ResourceIdListWorkChunkJson myData;
private RequestDetails myRequestDetails;
private TransactionDetails myTransactionDetails;
private IJobDataSink<?> myDataSink;
private String myChunkId;
private String myInstanceId;
private ReindexJobParameters myJobParameters;
public ResourceIdListWorkChunkJson getData() {
return myData;
}
public JobParameters setData(ResourceIdListWorkChunkJson theData) {
myData = theData;
return this;
}
public RequestDetails getRequestDetails() {
return myRequestDetails;
}
public JobParameters setRequestDetails(RequestDetails theRequestDetails) {
myRequestDetails = theRequestDetails;
return this;
}
public TransactionDetails getTransactionDetails() {
return myTransactionDetails;
}
public JobParameters setTransactionDetails(TransactionDetails theTransactionDetails) {
myTransactionDetails = theTransactionDetails;
return this;
}
public IJobDataSink<?> getDataSink() {
return myDataSink;
}
public JobParameters setDataSink(IJobDataSink<?> theDataSink) {
myDataSink = theDataSink;
return this;
}
public String getChunkId() {
return myChunkId;
}
public JobParameters setChunkId(String theChunkId) {
myChunkId = theChunkId;
return this;
}
public String getInstanceId() {
return myInstanceId;
}
public JobParameters setInstanceId(String theInstanceId) {
myInstanceId = theInstanceId;
return this;
}
public ReindexJobParameters getJobParameters() {
return myJobParameters;
}
public JobParameters setJobParameters(ReindexJobParameters theJobParameters) {
myJobParameters = theJobParameters;
return this;
}
}
private final DaoRegistry myDaoRegistry;
private final IFhirSystemDao<?, ?> mySystemDao;
private final IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
private final ResourceIdListWorkChunkJson myData;
private final RequestDetails myRequestDetails;
private final TransactionDetails myTransactionDetails;
private final IJobDataSink<?> myDataSink;
private final String myChunkId;
private final String myInstanceId;
private final ReindexJobParameters myJobParameters;
public ReindexTaskV2(
JobParameters theJobParameters,
DaoRegistry theRegistry,
IFhirSystemDao<?, ?> theSystemDao,
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
myDaoRegistry = theRegistry;
mySystemDao = theSystemDao;
myIdHelperService = theIdHelperService;
myData = theJobParameters.getData();
myRequestDetails = theJobParameters.getRequestDetails();
myTransactionDetails = theJobParameters.getTransactionDetails();
myDataSink = theJobParameters.getDataSink();
myInstanceId = theJobParameters.getInstanceId();
myChunkId = theJobParameters.getChunkId();
myJobParameters = theJobParameters.getJobParameters();
myDataSink.setWarningProcessor(new ReindexWarningProcessor());
}
@Override
public ReindexResults doInTransaction(@Nonnull TransactionStatus theStatus) {
List<IResourcePersistentId<?>> persistentIds = myData.getResourcePersistentIds(myIdHelperService);
ourLog.info(
"Starting reindex work chunk with {} resources - Instance[{}] Chunk[{}]",
persistentIds.size(),
myInstanceId,
myChunkId);
StopWatch sw = new StopWatch();
ReindexResults reindexResults = new ReindexResults();
// Prefetch Resources from DB
boolean reindexSearchParameters =
myJobParameters.getReindexSearchParameters() != ReindexParameters.ReindexSearchParametersEnum.NONE;
mySystemDao.preFetchResources(persistentIds, reindexSearchParameters);
ourLog.info(
"Prefetched {} resources in {} - Instance[{}] Chunk[{}]",
persistentIds.size(),
sw,
myInstanceId,
myChunkId);
ReindexParameters parameters = new ReindexParameters()
.setReindexSearchParameters(myJobParameters.getReindexSearchParameters())
.setOptimizeStorage(myJobParameters.getOptimizeStorage())
.setOptimisticLock(myJobParameters.getOptimisticLock());
// Reindex
sw.restart();
for (int i = 0; i < myData.size(); i++) {
String nextResourceType = myData.getResourceType(i);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(nextResourceType);
IResourcePersistentId<?> resourcePersistentId = persistentIds.get(i);
try {
ReindexOutcome outcome =
dao.reindex(resourcePersistentId, parameters, myRequestDetails, myTransactionDetails);
outcome.getWarnings().forEach(myDataSink::recoveredError);
reindexResults.addResourceTypeToCompletionStatus(nextResourceType, outcome.isHasPendingWork());
} catch (BaseServerResponseException | DataFormatException e) {
String resourceForcedId = myIdHelperService
.translatePidIdToForcedIdWithCache(resourcePersistentId)
.orElse(resourcePersistentId.toString());
String resourceId = nextResourceType + "/" + resourceForcedId;
ourLog.error("Failure during reindexing {}", resourceId, e);
myDataSink.recoveredError("Failure reindexing " + resourceId + ": " + e.getMessage());
}
}
ourLog.info(
"Finished reindexing {} resources in {} - {}/sec - Instance[{}] Chunk[{}]",
persistentIds.size(),
sw,
sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS),
myInstanceId,
myChunkId);
return reindexResults;
}
}

View File

@ -0,0 +1,110 @@
package ca.uhn.fhir.batch2.jobs.reindex.v2;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
@Configuration
public class ReindexV2Config {
@Autowired
private ReindexJobService myReindexJobService;
@Autowired
private HapiTransactionService myHapiTransactionService;
@Autowired
private IFhirSystemDao<?, ?> mySystemDao;
@Autowired
private DaoRegistry myRegistry;
@Autowired
private IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
@Autowired
@Qualifier("reindexGenerateRangeChunkStepV2")
private IJobStepWorker<ReindexJobParameters, VoidModel, ChunkRangeJson> myReindexGenerateRangeChunkStep;
@Autowired
@Qualifier("reindexLoadIdsStepV2")
private IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> myReindexLoadIdsStep;
@Autowired
private ReindexJobParametersValidatorV2 myReindexJobParametersValidator;
// Version 2
@Bean
public JobDefinition<ReindexJobParameters> reindexJobDefinitionV2() {
return JobDefinition.newBuilder()
.setJobDefinitionId(JOB_REINDEX)
.setJobDescription("Reindex resources")
.setJobDefinitionVersion(2)
.setParametersType(ReindexJobParameters.class)
.setParametersValidator(myReindexJobParametersValidator)
.gatedExecution()
.addFirstStep(
"generate-ranges",
"Generate data ranges to reindex",
ChunkRangeJson.class,
myReindexGenerateRangeChunkStep)
.addIntermediateStep(
"load-ids",
"Load IDs of resources to reindex",
ResourceIdListWorkChunkJson.class,
myReindexLoadIdsStep)
.addIntermediateStep(
"reindex-start", "Perform the resource reindex", ReindexResults.class, reindexStepV2())
.addLastStep("reindex-pending-work", "Waits for reindex work to complete.", pendingWorkStep())
.build();
}
@Bean
public CheckPendingReindexWorkStep pendingWorkStep() {
return new CheckPendingReindexWorkStep(myReindexJobService);
}
@Bean
public ReindexStepV2 reindexStepV2() {
return new ReindexStepV2(
myReindexJobService, myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService);
}
@Bean("reindexGenerateRangeChunkStepV2")
public IJobStepWorker<ReindexJobParameters, VoidModel, ChunkRangeJson> reindexGenerateRangeChunksStep() {
return new GenerateRangeChunksStep<>();
}
@Bean("reindexLoadIdsStepV2")
public IJobStepWorker<ReindexJobParameters, ChunkRangeJson, ResourceIdListWorkChunkJson> reindexLoadIdsStep(
IBatch2DaoSvc theBatch2DaoSvc) {
return new LoadIdsStep<>(theBatch2DaoSvc);
}
@Bean
public ReindexJobParametersValidatorV2 reindexJobParametersValidatorV2(IBatch2DaoSvc theBatch2DaoSvc) {
return new ReindexJobParametersValidatorV2(
new UrlListValidator(ProviderConstants.OPERATION_REINDEX, theBatch2DaoSvc));
}
}

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.batch2.jobs.reindex; package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator; import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
import ca.uhn.fhir.batch2.jobs.reindex.v1.ReindexJobParametersValidatorV1;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
@ -13,13 +14,13 @@ import java.util.List;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class ReindexJobParametersValidatorTest { public class ReindexTaskParametersValidatorTest {
@Mock @Mock
private UrlListValidator myListValidator; private UrlListValidator myListValidator;
@InjectMocks @InjectMocks
private ReindexJobParametersValidator myValidator; private ReindexJobParametersValidatorV1 myValidator;
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = { "\n", " ", "\t" }) @ValueSource(strings = { "\n", " ", "\t" })

View File

@ -38,11 +38,18 @@ public class RetryChunkLaterException extends RuntimeException {
private final Duration myNextPollDuration; private final Duration myNextPollDuration;
public RetryChunkLaterException() { public RetryChunkLaterException() {
this(ONE_MINUTE); this("", ONE_MINUTE);
} }
/**
* For HAPI exceptions, use {@link RetryChunkLaterException#RetryChunkLaterException(String, Duration)}
*/
public RetryChunkLaterException(Duration theDuration) { public RetryChunkLaterException(Duration theDuration) {
super(); this("", theDuration);
}
public RetryChunkLaterException(String theCode, Duration theDuration) {
super(theCode);
this.myNextPollDuration = theDuration; this.myNextPollDuration = theDuration;
} }

View File

@ -62,7 +62,7 @@ public class ResourceIdListWorkChunkJson implements IModelJson {
return myRequestPartitionId; return myRequestPartitionId;
} }
private List<TypedPidJson> getTypedPids() { public List<TypedPidJson> getTypedPids() {
if (myTypedPids == null) { if (myTypedPids == null) {
myTypedPids = new ArrayList<>(); myTypedPids = new ArrayList<>();
} }

View File

@ -26,6 +26,7 @@ import ca.uhn.fhir.jpa.api.model.DeleteConflictList;
import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome; import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.api.model.ExpungeOutcome; import ca.uhn.fhir.jpa.api.model.ExpungeOutcome;
import ca.uhn.fhir.jpa.api.model.ReindexJobStatus;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource; import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum; import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
@ -315,12 +316,24 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
* @param theResourcePersistentId The ID * @param theResourcePersistentId The ID
* @return * @return
*/ */
@SuppressWarnings("rawtypes")
ReindexOutcome reindex( ReindexOutcome reindex(
IResourcePersistentId theResourcePersistentId, IResourcePersistentId theResourcePersistentId,
ReindexParameters theReindexParameters, ReindexParameters theReindexParameters,
RequestDetails theRequest, RequestDetails theRequest,
TransactionDetails theTransactionDetails); TransactionDetails theTransactionDetails);
/**
* Returns ReindexJobStatus information object that tells the caller
* if a reindex job is still in progress or not.
*
* If the implementing DAO requires additional work during reindexing,
* this is the method to override.
*/
default ReindexJobStatus getReindexJobStatus() {
return ReindexJobStatus.NO_WORK_NEEDED;
}
void removeTag( void removeTag(
IIdType theId, TagTypeEnum theTagType, String theSystem, String theCode, RequestDetails theRequestDetails); IIdType theId, TagTypeEnum theTagType, String theSystem, String theCode, RequestDetails theRequestDetails);

View File

@ -29,6 +29,11 @@ public class ReindexOutcome {
private List<String> myWarnings; private List<String> myWarnings;
/**
* True if there is additional (async) work to wait on.
*/
private boolean myHasPendingWork;
public List<String> getWarnings() { public List<String> getWarnings() {
return defaultIfNull(myWarnings, Collections.emptyList()); return defaultIfNull(myWarnings, Collections.emptyList());
} }
@ -39,4 +44,12 @@ public class ReindexOutcome {
} }
myWarnings.add(theWarning); myWarnings.add(theWarning);
} }
public boolean isHasPendingWork() {
return myHasPendingWork;
}
public void setHasPendingWork(boolean theHasPendingWork) {
myHasPendingWork = theHasPendingWork;
}
} }

View File

@ -0,0 +1,16 @@
package ca.uhn.fhir.jpa.api.model;
public class ReindexJobStatus {
public static ReindexJobStatus NO_WORK_NEEDED = new ReindexJobStatus();
private boolean myHasReindexWorkPending;
public boolean isHasReindexWorkPending() {
return myHasReindexWorkPending;
}
public void setHasReindexWorkPending(boolean theHasReindexWorkPending) {
myHasReindexWorkPending = theHasReindexWorkPending;
}
}