Compare commits
3 Commits
5b50545a50
...
d677d83cce
Author | SHA1 | Date |
---|---|---|
leif stawnyczy | d677d83cce | |
leif stawnyczy | bddcea8db6 | |
leif stawnyczy | bb5d91ac37 |
|
@ -0,0 +1,17 @@
|
||||||
|
---
|
||||||
|
type: fix
|
||||||
|
issue: 6285
|
||||||
|
title: "Updated the Reindex Batch2 job to allow
|
||||||
|
for an additional step that will check to ensure
|
||||||
|
that no pending 'reindex' work is needed.
|
||||||
|
This was done to prevent a bug in which
|
||||||
|
value set expansion would not return all
|
||||||
|
the existing CodeSystem Concepts after
|
||||||
|
a reindex call, due to some of the concepts
|
||||||
|
being deferred to future job runs.
|
||||||
|
|
||||||
|
As such, `$reindex` operations on CodeSystems
|
||||||
|
will no longer result in incorrect value set
|
||||||
|
expansion when such an expansion is called
|
||||||
|
'too soon' after a $reindex operation.
|
||||||
|
"
|
|
@ -27,7 +27,11 @@ import ca.uhn.fhir.context.support.IValidationSupport.CodeValidationResult;
|
||||||
import ca.uhn.fhir.context.support.LookupCodeRequest;
|
import ca.uhn.fhir.context.support.LookupCodeRequest;
|
||||||
import ca.uhn.fhir.context.support.ValidationSupportContext;
|
import ca.uhn.fhir.context.support.ValidationSupportContext;
|
||||||
import ca.uhn.fhir.i18n.Msg;
|
import ca.uhn.fhir.i18n.Msg;
|
||||||
|
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
|
||||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem;
|
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.ReindexOutcome;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
|
||||||
|
import ca.uhn.fhir.jpa.api.model.ReindexJobStatus;
|
||||||
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||||
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
|
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
|
||||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||||
|
@ -176,6 +180,47 @@ public class JpaResourceDaoCodeSystem<T extends IBaseResource> extends BaseHapiF
|
||||||
myTermDeferredStorageSvc.deleteCodeSystemForResource(theEntityToDelete);
|
myTermDeferredStorageSvc.deleteCodeSystemForResource(theEntityToDelete);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If there are more code systems to process
|
||||||
|
* than {@link JpaStorageSettings#getDeferIndexingForCodesystemsOfSize()},
|
||||||
|
* then these codes will have their processing deferred (for a later time).
|
||||||
|
*
|
||||||
|
* This can result in future reindex steps *skipping* these code systems (if
|
||||||
|
* they're still deferred) and thus incorrect expansions resulting.
|
||||||
|
*
|
||||||
|
* So we override the reindex method for CodeSystems specifically to
|
||||||
|
* force reindex batch jobs to wait until all code systems are processed before
|
||||||
|
* moving on.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
@Override
|
||||||
|
public ReindexOutcome reindex(
|
||||||
|
IResourcePersistentId thePid,
|
||||||
|
ReindexParameters theReindexParameters,
|
||||||
|
RequestDetails theRequest,
|
||||||
|
TransactionDetails theTransactionDetails) {
|
||||||
|
ReindexOutcome outcome = super.reindex(thePid, theReindexParameters, theRequest, theTransactionDetails);
|
||||||
|
|
||||||
|
if (outcome.getWarnings().isEmpty()) {
|
||||||
|
outcome.setHasPendingWork(true);
|
||||||
|
}
|
||||||
|
return outcome;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReindexJobStatus getReindexJobStatus() {
|
||||||
|
boolean isQueueEmpty = myTermDeferredStorageSvc.isStorageQueueEmpty(true);
|
||||||
|
|
||||||
|
ReindexJobStatus status = new ReindexJobStatus();
|
||||||
|
status.setHasReindexWorkPending(!isQueueEmpty);
|
||||||
|
if (status.isHasReindexWorkPending()) {
|
||||||
|
// force a run
|
||||||
|
myTermDeferredStorageSvc.saveDeferred();
|
||||||
|
}
|
||||||
|
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ResourceTable updateEntity(
|
public ResourceTable updateEntity(
|
||||||
RequestDetails theRequest,
|
RequestDetails theRequest,
|
||||||
|
|
|
@ -593,7 +593,7 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
|
||||||
if (theStatisticsTracker.getUpdatedConceptCount() <= myStorageSettings.getDeferIndexingForCodesystemsOfSize()) {
|
if (theStatisticsTracker.getUpdatedConceptCount() <= myStorageSettings.getDeferIndexingForCodesystemsOfSize()) {
|
||||||
saveConcept(conceptToAdd);
|
saveConcept(conceptToAdd);
|
||||||
Long nextConceptPid = conceptToAdd.getId();
|
Long nextConceptPid = conceptToAdd.getId();
|
||||||
Validate.notNull(nextConceptPid);
|
Objects.requireNonNull(nextConceptPid);
|
||||||
} else {
|
} else {
|
||||||
myDeferredStorageSvc.addConceptToStorageQueue(conceptToAdd);
|
myDeferredStorageSvc.addConceptToStorageQueue(conceptToAdd);
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,6 @@ package ca.uhn.fhir.jpa.term;
|
||||||
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
||||||
import ca.uhn.fhir.batch2.model.StatusEnum;
|
|
||||||
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
|
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
|
||||||
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
|
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
|
||||||
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao;
|
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao;
|
||||||
|
@ -79,6 +78,8 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHas
|
||||||
private static final long SAVE_ALL_DEFERRED_WARN_MINUTES = 1;
|
private static final long SAVE_ALL_DEFERRED_WARN_MINUTES = 1;
|
||||||
private static final long SAVE_ALL_DEFERRED_ERROR_MINUTES = 5;
|
private static final long SAVE_ALL_DEFERRED_ERROR_MINUTES = 5;
|
||||||
private boolean myAllowDeferredTasksTimeout = true;
|
private boolean myAllowDeferredTasksTimeout = true;
|
||||||
|
private static final List<String> BATCH_JOBS_TO_CARE_ABOUT =
|
||||||
|
List.of(TERM_CODE_SYSTEM_DELETE_JOB_NAME, TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME);
|
||||||
private final List<TermCodeSystem> myDeferredCodeSystemsDeletions = Collections.synchronizedList(new ArrayList<>());
|
private final List<TermCodeSystem> myDeferredCodeSystemsDeletions = Collections.synchronizedList(new ArrayList<>());
|
||||||
private final Queue<TermCodeSystemVersion> myDeferredCodeSystemVersionsDeletions = new ConcurrentLinkedQueue<>();
|
private final Queue<TermCodeSystemVersion> myDeferredCodeSystemVersionsDeletions = new ConcurrentLinkedQueue<>();
|
||||||
private final List<TermConcept> myDeferredConcepts = Collections.synchronizedList(new ArrayList<>());
|
private final List<TermConcept> myDeferredConcepts = Collections.synchronizedList(new ArrayList<>());
|
||||||
|
@ -448,15 +449,18 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHas
|
||||||
* This is mostly a fail-safe
|
* This is mostly a fail-safe
|
||||||
* because "cancelled" jobs are never removed.
|
* because "cancelled" jobs are never removed.
|
||||||
*/
|
*/
|
||||||
List<String> executions = new ArrayList<>(myJobExecutions);
|
|
||||||
List<String> idsToDelete = new ArrayList<>();
|
List<String> idsToDelete = new ArrayList<>();
|
||||||
for (String id : executions) {
|
for (String jobId : BATCH_JOBS_TO_CARE_ABOUT) {
|
||||||
// TODO - might want to consider a "fetch all instances"
|
List<JobInstance> jobInstanceInEndedState = myJobCoordinator.getInstancesbyJobDefinitionIdAndEndedStatus(
|
||||||
JobInstance instance = myJobCoordinator.getInstance(id);
|
jobId,
|
||||||
if (StatusEnum.getEndedStatuses().contains(instance.getStatus())) {
|
true, // ended = true (COMPLETED, FAILED, CANCELLED jobs only)
|
||||||
|
Math.max(myJobExecutions.size(), 1), // at most this many
|
||||||
|
0);
|
||||||
|
for (JobInstance instance : jobInstanceInEndedState) {
|
||||||
idsToDelete.add(instance.getInstanceId());
|
idsToDelete.add(instance.getInstanceId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (String id : idsToDelete) {
|
for (String id : idsToDelete) {
|
||||||
myJobExecutions.remove(id);
|
myJobExecutions.remove(id);
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamUri;
|
||||||
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
|
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
|
||||||
import ca.uhn.fhir.jpa.model.util.SearchParamHash;
|
import ca.uhn.fhir.jpa.model.util.SearchParamHash;
|
||||||
import ca.uhn.fhir.jpa.model.util.UcumServiceUtil;
|
import ca.uhn.fhir.jpa.model.util.UcumServiceUtil;
|
||||||
import ca.uhn.fhir.jpa.reindex.ReindexStepTest;
|
import ca.uhn.fhir.jpa.reindex.ReindexStepV1Test;
|
||||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||||
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
|
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
|
||||||
import ca.uhn.fhir.rest.param.BaseParam;
|
import ca.uhn.fhir.rest.param.BaseParam;
|
||||||
|
@ -321,7 +321,7 @@ public class FhirResourceDaoR4IndexStorageOptimizedTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// Additional existing tests with enabled IndexStorageOptimized
|
// Additional existing tests with enabled IndexStorageOptimized
|
||||||
@Nested
|
@Nested
|
||||||
public class IndexStorageOptimizedReindexStepTest extends ReindexStepTest {
|
public class IndexStorageOptimizedReindexStepTestV1 extends ReindexStepV1Test {
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() {
|
void setUp() {
|
||||||
myStorageSettings.setIndexStorageOptimized(true);
|
myStorageSettings.setIndexStorageOptimized(true);
|
||||||
|
|
|
@ -2,12 +2,15 @@ package ca.uhn.fhir.jpa.dao.r4;
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||||
import ca.uhn.fhir.batch2.api.RunOutcome;
|
import ca.uhn.fhir.batch2.api.RunOutcome;
|
||||||
|
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||||
import ca.uhn.fhir.batch2.api.VoidModel;
|
import ca.uhn.fhir.batch2.api.VoidModel;
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson;
|
import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson;
|
||||||
import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeStep;
|
import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeStep;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStepV1;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep;
|
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||||
|
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
import ca.uhn.fhir.context.support.ValidationSupportContext;
|
import ca.uhn.fhir.context.support.ValidationSupportContext;
|
||||||
import ca.uhn.fhir.context.support.ValueSetExpansionOptions;
|
import ca.uhn.fhir.context.support.ValueSetExpansionOptions;
|
||||||
|
@ -18,13 +21,13 @@ import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome;
|
||||||
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
|
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
|
||||||
import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum;
|
import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum;
|
||||||
import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao;
|
import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao;
|
||||||
import ca.uhn.fhir.jpa.reindex.ReindexTestHelper;
|
|
||||||
import ca.uhn.fhir.jpa.entity.TermValueSet;
|
import ca.uhn.fhir.jpa.entity.TermValueSet;
|
||||||
import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum;
|
import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum;
|
||||||
import ca.uhn.fhir.jpa.interceptor.ForceOffsetSearchModeInterceptor;
|
import ca.uhn.fhir.jpa.interceptor.ForceOffsetSearchModeInterceptor;
|
||||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||||
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
||||||
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
|
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
|
||||||
|
import ca.uhn.fhir.jpa.reindex.ReindexTestHelper;
|
||||||
import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider;
|
import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider;
|
||||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||||
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
|
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
|
||||||
|
@ -146,7 +149,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
|
||||||
@Autowired
|
@Autowired
|
||||||
private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc;
|
private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc;
|
||||||
@Autowired
|
@Autowired
|
||||||
private ReindexStep myReindexStep;
|
private ReindexStepV1 myReindexStepV1;
|
||||||
@Autowired
|
@Autowired
|
||||||
private DeleteExpungeStep myDeleteExpungeStep;
|
private DeleteExpungeStep myDeleteExpungeStep;
|
||||||
@Autowired
|
@Autowired
|
||||||
|
@ -1018,7 +1021,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
|
||||||
// insert to: HFJ_RESOURCE, HFJ_RES_VER, HFJ_RES_LINK
|
// insert to: HFJ_RESOURCE, HFJ_RES_VER, HFJ_RES_LINK
|
||||||
assertEquals(4, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
|
assertEquals(4, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
|
||||||
assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
|
assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
|
@ -1031,7 +1033,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
|
||||||
})
|
})
|
||||||
public void testReindexJob_OptimizeStorage(boolean theOptimisticLock, ReindexParameters.OptimizeStorageModeEnum theOptimizeStorageModeEnum, int theExpectedSelectCount, int theExpectedUpdateCount) {
|
public void testReindexJob_OptimizeStorage(boolean theOptimisticLock, ReindexParameters.OptimizeStorageModeEnum theOptimizeStorageModeEnum, int theExpectedSelectCount, int theExpectedUpdateCount) {
|
||||||
// Setup
|
// Setup
|
||||||
|
|
||||||
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson();
|
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson();
|
||||||
IIdType patientId = createPatient(withActiveTrue());
|
IIdType patientId = createPatient(withActiveTrue());
|
||||||
IIdType orgId = createOrganization(withName("MY ORG"));
|
IIdType orgId = createOrganization(withName("MY ORG"));
|
||||||
|
@ -1056,7 +1057,14 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
myCaptureQueriesListener.clear();
|
myCaptureQueriesListener.clear();
|
||||||
RunOutcome outcome = myReindexStep.doReindex(data, mock(IJobDataSink.class), "123", "456", params);
|
JobInstance instance = new JobInstance();
|
||||||
|
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
|
||||||
|
params,
|
||||||
|
data,
|
||||||
|
instance,
|
||||||
|
mock(WorkChunk.class)
|
||||||
|
);
|
||||||
|
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, mock(IJobDataSink.class));
|
||||||
|
|
||||||
// validate
|
// validate
|
||||||
assertThat(myCaptureQueriesListener.getSelectQueriesForCurrentThread()).hasSize(theExpectedSelectCount);
|
assertThat(myCaptureQueriesListener.getSelectQueriesForCurrentThread()).hasSize(theExpectedSelectCount);
|
||||||
|
@ -1064,7 +1072,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
|
||||||
assertThat(myCaptureQueriesListener.getInsertQueriesForCurrentThread()).isEmpty();
|
assertThat(myCaptureQueriesListener.getInsertQueriesForCurrentThread()).isEmpty();
|
||||||
assertThat(myCaptureQueriesListener.getDeleteQueriesForCurrentThread()).isEmpty();
|
assertThat(myCaptureQueriesListener.getDeleteQueriesForCurrentThread()).isEmpty();
|
||||||
assertEquals(10, outcome.getRecordsProcessed());
|
assertEquals(10, outcome.getRecordsProcessed());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -1095,7 +1102,14 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
myCaptureQueriesListener.clear();
|
myCaptureQueriesListener.clear();
|
||||||
RunOutcome outcome = myReindexStep.doReindex(data, mock(IJobDataSink.class), "123", "456", params);
|
JobInstance instance = new JobInstance();
|
||||||
|
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
|
||||||
|
params,
|
||||||
|
data,
|
||||||
|
instance,
|
||||||
|
mock(WorkChunk.class)
|
||||||
|
);
|
||||||
|
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, mock(IJobDataSink.class));
|
||||||
assertEquals(20, outcome.getRecordsProcessed());
|
assertEquals(20, outcome.getRecordsProcessed());
|
||||||
|
|
||||||
// validate
|
// validate
|
||||||
|
@ -1103,10 +1117,8 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
|
||||||
assertEquals(0, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size());
|
assertEquals(0, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size());
|
||||||
assertEquals(0, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size());
|
assertEquals(0, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size());
|
||||||
assertEquals(0, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size());
|
assertEquals(0, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void assertNoPartitionSelectors() {
|
public void assertNoPartitionSelectors() {
|
||||||
List<SqlQuery> selectQueries = myCaptureQueriesListener.getSelectQueriesForCurrentThread();
|
List<SqlQuery> selectQueries = myCaptureQueriesListener.getSelectQueriesForCurrentThread();
|
||||||
for (SqlQuery next : selectQueries) {
|
for (SqlQuery next : selectQueries) {
|
||||||
|
|
|
@ -6,7 +6,7 @@ import ca.uhn.fhir.batch2.api.VoidModel;
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
|
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStepV1;
|
||||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
@ -26,7 +26,7 @@ import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
public class ReindexStepTest {
|
public class ReindexStepV1Test {
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private HapiTransactionService myHapiTransactionService;
|
private HapiTransactionService myHapiTransactionService;
|
||||||
|
@ -34,7 +34,7 @@ public class ReindexStepTest {
|
||||||
private IJobDataSink<VoidModel> myDataSink;
|
private IJobDataSink<VoidModel> myDataSink;
|
||||||
|
|
||||||
@InjectMocks
|
@InjectMocks
|
||||||
private ReindexStep myReindexStep;
|
private ReindexStepV1 myReindexStepV1;
|
||||||
|
|
||||||
@Captor
|
@Captor
|
||||||
private ArgumentCaptor<HapiTransactionService.ExecutionBuilder> builderArgumentCaptor;
|
private ArgumentCaptor<HapiTransactionService.ExecutionBuilder> builderArgumentCaptor;
|
||||||
|
@ -51,7 +51,7 @@ public class ReindexStepTest {
|
||||||
when(myHapiTransactionService.buildExecutionBuilder(any())).thenCallRealMethod();
|
when(myHapiTransactionService.buildExecutionBuilder(any())).thenCallRealMethod();
|
||||||
|
|
||||||
// when
|
// when
|
||||||
myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", reindexJobParameters);
|
myReindexStepV1.doReindex(data, myDataSink, "index-id", "chunk-id", reindexJobParameters);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
assertMethodArgumentRequestPartitionId(expectedPartitionId);
|
assertMethodArgumentRequestPartitionId(expectedPartitionId);
|
|
@ -2,10 +2,13 @@ package ca.uhn.fhir.jpa.reindex;
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||||
import ca.uhn.fhir.batch2.api.RunOutcome;
|
import ca.uhn.fhir.batch2.api.RunOutcome;
|
||||||
|
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||||
import ca.uhn.fhir.batch2.api.VoidModel;
|
import ca.uhn.fhir.batch2.api.VoidModel;
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStepV1;
|
||||||
|
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||||
|
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||||
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
|
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
|
||||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||||
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
|
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
|
||||||
|
@ -25,13 +28,14 @@ import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXED;
|
||||||
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED;
|
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
public class ReindexStepTest extends BaseJpaR4Test {
|
public class ReindexStepV1Test extends BaseJpaR4Test {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ReindexStep myReindexStep;
|
private ReindexStepV1 myReindexStepV1;
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private IJobDataSink<VoidModel> myDataSink;
|
private IJobDataSink<VoidModel> myDataSink;
|
||||||
|
@ -46,9 +50,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReindex_NoActionNeeded() {
|
public void testReindex_NoActionNeeded() {
|
||||||
|
|
||||||
// Setup
|
// Setup
|
||||||
|
|
||||||
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
|
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
|
||||||
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong();
|
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong();
|
||||||
|
|
||||||
|
@ -57,9 +59,19 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
data.addTypedPid("Patient", id1);
|
data.addTypedPid("Patient", id1);
|
||||||
|
|
||||||
// Execute
|
// Execute
|
||||||
|
ReindexJobParameters params = new ReindexJobParameters();
|
||||||
myCaptureQueriesListener.clear();
|
myCaptureQueriesListener.clear();
|
||||||
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
|
JobInstance instance = new JobInstance();
|
||||||
|
instance.setInstanceId("index-id");
|
||||||
|
WorkChunk chunk = new WorkChunk();
|
||||||
|
chunk.setId("chunk-id");
|
||||||
|
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
|
||||||
|
params,
|
||||||
|
data,
|
||||||
|
instance,
|
||||||
|
chunk
|
||||||
|
);
|
||||||
|
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink);
|
||||||
|
|
||||||
// Verify
|
// Verify
|
||||||
assertEquals(2, outcome.getRecordsProcessed());
|
assertEquals(2, outcome.getRecordsProcessed());
|
||||||
|
@ -72,12 +84,9 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
|
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReindex_NoActionNeeded_IndexMissingFieldsEnabled() {
|
public void testReindex_NoActionNeeded_IndexMissingFieldsEnabled() {
|
||||||
|
|
||||||
// Setup
|
// Setup
|
||||||
|
|
||||||
myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED);
|
myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED);
|
||||||
|
|
||||||
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
|
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
|
||||||
|
@ -88,9 +97,16 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
data.addTypedPid("Patient", id1);
|
data.addTypedPid("Patient", id1);
|
||||||
|
|
||||||
// Execute
|
// Execute
|
||||||
|
ReindexJobParameters params = new ReindexJobParameters();
|
||||||
myCaptureQueriesListener.clear();
|
myCaptureQueriesListener.clear();
|
||||||
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
|
JobInstance instance = new JobInstance();
|
||||||
|
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
|
||||||
|
params,
|
||||||
|
data,
|
||||||
|
instance,
|
||||||
|
mock(WorkChunk.class)
|
||||||
|
);
|
||||||
|
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink);
|
||||||
|
|
||||||
// Verify
|
// Verify
|
||||||
assertEquals(2, outcome.getRecordsProcessed());
|
assertEquals(2, outcome.getRecordsProcessed());
|
||||||
|
@ -121,9 +137,16 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
});
|
});
|
||||||
|
|
||||||
// Execute
|
// Execute
|
||||||
|
ReindexJobParameters params = new ReindexJobParameters();
|
||||||
myCaptureQueriesListener.clear();
|
myCaptureQueriesListener.clear();
|
||||||
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
|
JobInstance instance = new JobInstance();
|
||||||
|
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
|
||||||
|
params,
|
||||||
|
data,
|
||||||
|
instance,
|
||||||
|
mock(WorkChunk.class)
|
||||||
|
);
|
||||||
|
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink);
|
||||||
|
|
||||||
// Verify
|
// Verify
|
||||||
assertEquals(2, outcome.getRecordsProcessed());
|
assertEquals(2, outcome.getRecordsProcessed());
|
||||||
|
@ -136,12 +159,9 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
|
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReindex_IndexesAddedAndRemoved_IndexMissingFieldsEnabled() {
|
public void testReindex_IndexesAddedAndRemoved_IndexMissingFieldsEnabled() {
|
||||||
|
|
||||||
// Setup
|
// Setup
|
||||||
|
|
||||||
myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED);
|
myStorageSettings.setIndexMissingFields(JpaStorageSettings.IndexEnabledEnum.ENABLED);
|
||||||
boolean markResourcesForReindexingUponSearchParameterChange = myStorageSettings.isMarkResourcesForReindexingUponSearchParameterChange();
|
boolean markResourcesForReindexingUponSearchParameterChange = myStorageSettings.isMarkResourcesForReindexingUponSearchParameterChange();
|
||||||
myStorageSettings.setMarkResourcesForReindexingUponSearchParameterChange(false); // if this were true, it would set up a lot of reindex jobs extraneous to the one we're trying to test
|
myStorageSettings.setMarkResourcesForReindexingUponSearchParameterChange(false); // if this were true, it would set up a lot of reindex jobs extraneous to the one we're trying to test
|
||||||
|
@ -189,9 +209,16 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
mySearchParamRegistry.forceRefresh();
|
mySearchParamRegistry.forceRefresh();
|
||||||
|
|
||||||
// Execute
|
// Execute
|
||||||
|
ReindexJobParameters params = new ReindexJobParameters();
|
||||||
myCaptureQueriesListener.clear();
|
myCaptureQueriesListener.clear();
|
||||||
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
|
JobInstance instance = new JobInstance();
|
||||||
|
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
|
||||||
|
params,
|
||||||
|
data,
|
||||||
|
instance,
|
||||||
|
mock(WorkChunk.class)
|
||||||
|
);
|
||||||
|
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink);
|
||||||
|
|
||||||
// Verify
|
// Verify
|
||||||
assertEquals(2, outcome.getRecordsProcessed());
|
assertEquals(2, outcome.getRecordsProcessed());
|
||||||
|
@ -207,9 +234,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReindex_OneResourceReindexFailedButOthersSucceeded() {
|
public void testReindex_OneResourceReindexFailedButOthersSucceeded() {
|
||||||
|
|
||||||
// Setup
|
// Setup
|
||||||
|
|
||||||
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
|
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
|
||||||
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong();
|
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong();
|
||||||
Long idPatientToInvalidate = createPatient().getIdPartAsLong();
|
Long idPatientToInvalidate = createPatient().getIdPartAsLong();
|
||||||
|
@ -234,9 +259,19 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
});
|
});
|
||||||
|
|
||||||
// Execute
|
// Execute
|
||||||
|
ReindexJobParameters params = new ReindexJobParameters();
|
||||||
myCaptureQueriesListener.clear();
|
myCaptureQueriesListener.clear();
|
||||||
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
|
JobInstance instance = new JobInstance();
|
||||||
|
instance.setInstanceId("index-id");
|
||||||
|
WorkChunk workChunk = new WorkChunk();
|
||||||
|
workChunk.setId("workid");
|
||||||
|
StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(
|
||||||
|
params,
|
||||||
|
data,
|
||||||
|
instance,
|
||||||
|
workChunk
|
||||||
|
);
|
||||||
|
RunOutcome outcome = myReindexStepV1.run(stepExecutionDetails, myDataSink);
|
||||||
|
|
||||||
// Verify
|
// Verify
|
||||||
assertEquals(4, outcome.getRecordsProcessed());
|
assertEquals(4, outcome.getRecordsProcessed());
|
|
@ -47,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.junit.jupiter.api.Assertions.fail;
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
@SuppressWarnings("SqlDialectInspection")
|
@SuppressWarnings("SqlDialectInspection")
|
||||||
public class ReindexJobTest extends BaseJpaR4Test {
|
public class ReindexTaskTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private IJobCoordinator myJobCoordinator;
|
private IJobCoordinator myJobCoordinator;
|
|
@ -1,8 +1,8 @@
|
||||||
package ca.uhn.fhir.jpa.reindex;
|
package ca.uhn.fhir.jpa.reindex;
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
||||||
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
|
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
||||||
|
@ -27,7 +27,7 @@ import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||||
public class ReindexJobWithPartitioningTest extends BaseJpaR4Test {
|
public class ReindexTaskWithPartitioningTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private IJobCoordinator myJobCoordinator;
|
private IJobCoordinator myJobCoordinator;
|
|
@ -1,8 +1,8 @@
|
||||||
package ca.uhn.fhir.jpa.term;
|
package ca.uhn.fhir.jpa.term;
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
|
||||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
||||||
import ca.uhn.fhir.context.FhirVersionEnum;
|
import ca.uhn.fhir.context.FhirVersionEnum;
|
||||||
import ca.uhn.fhir.context.RuntimeSearchParam;
|
import ca.uhn.fhir.context.RuntimeSearchParam;
|
||||||
|
@ -36,7 +36,6 @@ import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||||
import ca.uhn.fhir.util.BundleBuilder;
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import jakarta.annotation.Nonnull;
|
import jakarta.annotation.Nonnull;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
@ -54,22 +53,21 @@ import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.data.domain.Pageable;
|
import org.springframework.data.domain.Pageable;
|
||||||
import org.springframework.transaction.TransactionStatus;
|
import org.springframework.transaction.TransactionStatus;
|
||||||
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_DELETE_JOB_NAME;
|
|
||||||
import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME;
|
|
||||||
import static ca.uhn.fhir.util.HapiExtensions.EXT_VALUESET_EXPANSION_MESSAGE;
|
import static ca.uhn.fhir.util.HapiExtensions.EXT_VALUESET_EXPANSION_MESSAGE;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.awaitility.Awaitility.await;
|
import static org.awaitility.Awaitility.await;
|
||||||
|
@ -2099,659 +2097,169 @@ public class ValueSetExpansionR4Test extends BaseTermR4Test implements IValueSet
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void expandValuesetAfterReindex() {
|
public void reindexCodeSystems_withDeferredCodeSystems_reindexesAllCodeSystems() {
|
||||||
// setup
|
// setup
|
||||||
IParser parser = myFhirContext.newJsonParser();
|
int deferredIndexingDefault = myStorageSettings.getDeferIndexingForCodesystemsOfSize();
|
||||||
CodeSystem cs;
|
|
||||||
ValueSet vs;
|
try {
|
||||||
String csStr;
|
/**
|
||||||
{
|
* The deferred count must be less than the number of
|
||||||
String vsStr = """
|
* concepts we are going to be uploading.
|
||||||
|
* That way, when we do the reindex, it will defer
|
||||||
|
* the additional code systems for a later job run.
|
||||||
|
*
|
||||||
|
* See {@link TermCodeSystemStorageSvcImpl#addConceptInHierarchy(TermCodeSystemVersion, Collection, TermConcept, UploadStatistics, Map, int)}
|
||||||
|
*
|
||||||
|
* Our CodeSystem below only has 6 Concepts to add.
|
||||||
|
* So we'll set the deferred count to 3 (so 3 will be deferred)
|
||||||
|
*/
|
||||||
|
myStorageSettings.setDeferIndexingForCodesystemsOfSize(3);
|
||||||
|
/*
|
||||||
|
* We're also setting our retry delay to a short timeframe
|
||||||
|
* so this test doesn't run too long.
|
||||||
|
*/
|
||||||
|
ReindexUtils.setRetryDelay(Duration.of(300, ChronoUnit.MILLIS));
|
||||||
|
|
||||||
|
IParser parser = myFhirContext.newJsonParser();
|
||||||
|
|
||||||
|
RequestDetails rq = new SystemRequestDetails();
|
||||||
|
CodeSystem cs;
|
||||||
|
ValueSet vs;
|
||||||
|
String csStr;
|
||||||
|
{
|
||||||
|
String vsStr = """
|
||||||
|
{
|
||||||
|
"resourceType": "ValueSet",
|
||||||
|
"id": "0447bffa-01fa-4405-828a-96192e74a5d8",
|
||||||
|
"meta": {
|
||||||
|
"versionId": "2",
|
||||||
|
"lastUpdated": "2024-04-09T15:06:24.025+00:00",
|
||||||
|
"source": "#f4491e490a6a2900"
|
||||||
|
},
|
||||||
|
"url": "https://health.gov.on.ca/idms/fhir/ValueSet/IDMS-Submission-Types",
|
||||||
|
"version": "1.0.0",
|
||||||
|
"name": "IDMS-SUBMISSION-TYPES",
|
||||||
|
"title": "IDMS Submission Types",
|
||||||
|
"status": "active",
|
||||||
|
"experimental": false,
|
||||||
|
"date": "2023-09-28",
|
||||||
|
"publisher": "IDMS",
|
||||||
|
"description": "List of Submission Types",
|
||||||
|
"compose": {
|
||||||
|
"include": [
|
||||||
|
{
|
||||||
|
"system": "https://health.gov.on.ca/idms/fhir/CodeSystem/Internal-Submission-Types"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
vs = parser.parseResource(ValueSet.class, vsStr);
|
||||||
|
csStr = """
|
||||||
{
|
{
|
||||||
"resourceType": "ValueSet",
|
"resourceType": "CodeSystem",
|
||||||
"id": "0447bffa-01fa-4405-828a-96192e74a5d8",
|
"id": "d9acd5b8-9533-4fa1-bb70-b4380957a8c3",
|
||||||
"meta": {
|
"meta": {
|
||||||
"versionId": "2",
|
"versionId": "14",
|
||||||
"lastUpdated": "2024-04-09T15:06:24.025+00:00",
|
"lastUpdated": "2024-06-03T17:49:56.580+00:00",
|
||||||
"source": "#f4491e490a6a2900"
|
"source": "#261a82258b0978a8"
|
||||||
},
|
},
|
||||||
"url": "https://health.gov.on.ca/idms/fhir/ValueSet/IDMS-Submission-Types",
|
"url": "https://health.gov.on.ca/idms/fhir/CodeSystem/Internal-Submission-Types",
|
||||||
"version": "1.0.0",
|
"version": "1.0.0",
|
||||||
"name": "IDMS-SUBMISSION-TYPES",
|
"name": "IDMS-Internal-Submission-Types",
|
||||||
"title": "IDMS Submission Types",
|
"status": "active",
|
||||||
"status": "active",
|
"date": "2023-09-07",
|
||||||
"experimental": false,
|
"publisher": "IDMS",
|
||||||
"date": "2023-09-28",
|
"description": "This contains a lists of codes Submission Type Codes.",
|
||||||
"publisher": "IDMS",
|
"content": "complete",
|
||||||
"description": "List of Submission Types",
|
"concept": [
|
||||||
"compose": {
|
{
|
||||||
"include": [
|
"code": "SUB-BRAND-PRODUCT",
|
||||||
{
|
"display": "New Brand Product (Non-New Chemical Entity)"
|
||||||
"system": "https://health.gov.on.ca/idms/fhir/CodeSystem/Internal-Submission-Types"
|
},
|
||||||
}
|
{
|
||||||
]
|
"code": "SUB-CLASSIFICATION",
|
||||||
}
|
"display": "Classification Change"
|
||||||
}
|
},
|
||||||
""";
|
{
|
||||||
vs = parser.parseResource(ValueSet.class, vsStr);
|
"code": "SUB-CLINICIAN-LED-SUBMISSIONS",
|
||||||
csStr = """
|
"display": "Clinician-Led Submissions"
|
||||||
{
|
},
|
||||||
"resourceType": "CodeSystem",
|
{
|
||||||
"id": "d9acd5b8-9533-4fa1-bb70-b4380957a8c3",
|
"code": "SUB-DELISTING",
|
||||||
"meta": {
|
"display": "Delisting"
|
||||||
"versionId": "14",
|
},
|
||||||
"lastUpdated": "2024-06-03T17:49:56.580+00:00",
|
{
|
||||||
"source": "#261a82258b0978a8"
|
"code": "SUB-DIN-CHANGE",
|
||||||
},
|
"display": "Drug Identification Number (DIN) Change"
|
||||||
"url": "https://health.gov.on.ca/idms/fhir/CodeSystem/Internal-Submission-Types",
|
},
|
||||||
"version": "1.0.0",
|
{
|
||||||
"name": "IDMS-Internal-Submission-Types",
|
"code": "SUB-DISTRIBUTOR",
|
||||||
"status": "active",
|
"display": "Distributor Change"
|
||||||
"date": "2023-09-07",
|
}
|
||||||
"publisher": "IDMS",
|
]
|
||||||
"description": "This contains a lists of codes Submission Type Codes.",
|
}
|
||||||
"content": "complete",
|
""";
|
||||||
"concept": [
|
|
||||||
{
|
|
||||||
"code": "SUB-BRAND-PRODUCT",
|
|
||||||
"display": "New Brand Product (Non-New Chemical Entity)"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-CLASSIFICATION",
|
|
||||||
"display": "Classification Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-CLINICIAN-LED-SUBMISSIONS",
|
|
||||||
"display": "Clinician-Led Submissions"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-DELISTING",
|
|
||||||
"display": "Delisting"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-DIN-CHANGE",
|
|
||||||
"display": "Drug Identification Number (DIN) Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-DISTRIBUTOR",
|
|
||||||
"display": "Distributor Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-DRUG-PRODUCT-NAME",
|
|
||||||
"display": "Drug Product Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-FORMULATION",
|
|
||||||
"display": "Formulation Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-GENERIC-LINE",
|
|
||||||
"display": "Generic Line Extension"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-GENERIC-PRODUCT",
|
|
||||||
"display": "New Generic Product"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-LINE-EXTENSION",
|
|
||||||
"display": "Line Extension"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NEW-INDICATION",
|
|
||||||
"display": "New Indication"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NEW-NATURAL",
|
|
||||||
"display": "New Natural Health Product"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NEW-OVER-COUNTER",
|
|
||||||
"display": "New Over the Counter (OTC) Product"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NEW-CHEMICAL",
|
|
||||||
"display": "New Chemical Entity"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NEW-NUTRITIONAL",
|
|
||||||
"display": "New Nutrition Product"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NUTRITIONAL-LINE",
|
|
||||||
"display": "Line Extension"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NEW-BIOSIMILAR",
|
|
||||||
"display": "New Biosimilar"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-BIOSIMILAR-LINE",
|
|
||||||
"display": "Line Extension"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NEW-DIABETIC",
|
|
||||||
"display": "New Diabetic Testing Agent (DTA)"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NEW-GLUCOUSE",
|
|
||||||
"display": "New Flash Glucose Monitoring (FGM) Product"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NON-FORMULARY-SINGLE",
|
|
||||||
"display": "Non-Formulary Single Source"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NON-FORMULARY-MULTIPLE",
|
|
||||||
"display": "Non-Formulary Multiple Source "
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-ORG-NAME",
|
|
||||||
"display": "Organization Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTHER-TYPE-IN",
|
|
||||||
"display": "Other"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OWNERSHIP",
|
|
||||||
"display": "Ownership Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-PRICE-CHANGE",
|
|
||||||
"display": "Price Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-PRODUCT-DISCONTINUE",
|
|
||||||
"display": "Product Discontinuation"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-PRODUCT-MONOGRAPH",
|
|
||||||
"display": "Product Monograph Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-PRODUCT-NAME",
|
|
||||||
"display": "Product Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-PRODUCT-CHANGE",
|
|
||||||
"display": "Product Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-SPECIAL-PROJECTS",
|
|
||||||
"display": "Special Projects"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-TEMPORARY-BENEFITS",
|
|
||||||
"display": "Temporary Benefits"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-VALVED",
|
|
||||||
"display": "New Valved Holding Chamber (VHC)"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-REDESIGNATION-BENEFITS",
|
|
||||||
"display": "Re-Designation of Benefits"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-MANUFACTURER-INQUIRIES",
|
|
||||||
"display": "Manufacturer Inquiries"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-CHALLENGES",
|
|
||||||
"display": "Challenges"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NEW-LINE",
|
|
||||||
"display": "Line Extension"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-MULTI-DIN-CHANGE",
|
|
||||||
"display": "Drug Identification Number (DIN) Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-MULTI-DRUG-PRODUCT-NAME",
|
|
||||||
"display": "Drug Product Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-MULTI-PRODUCT-MONOGRAPH",
|
|
||||||
"display": "Product Monograph Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-MULTI-FORMULATION",
|
|
||||||
"display": "Formulation Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-MULTI-OWNERSHIP",
|
|
||||||
"display": "Ownership Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-MULTI-DISTRIBUTOR",
|
|
||||||
"display": "Distributor Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-MULTI-ORG-NAME",
|
|
||||||
"display": "Organization Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-MULTI-PRODUCT-DISCONTINUE",
|
|
||||||
"display": "Product Discontinuation"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-MULTI-DELISTING",
|
|
||||||
"display": "Delisting"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-MULTI-PRICE-CHANGE",
|
|
||||||
"display": "Price Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-MULTI-OTHER-TYPE-IN",
|
|
||||||
"display": "Other"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-BIO-DIN-CHANGE",
|
|
||||||
"display": "Drug Identification Number (DIN) Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-BIO-DRUG-PRODUCT-NAME",
|
|
||||||
"display": "Drug Product Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-BIO-PRODUCT-MONOGRAPH",
|
|
||||||
"display": "Product Monograph Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-BIO-FORMULATION",
|
|
||||||
"display": "Formulation Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-BIO-OWNERSHIP",
|
|
||||||
"display": "Ownership Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-BIO-DISTRIBUTOR",
|
|
||||||
"display": "Distributor Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-BIO-ORG-NAME",
|
|
||||||
"display": "Organization Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-BIO-PRODUCT-DISCONTINUE",
|
|
||||||
"display": "Product Discontinuation"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-BIO-DELISTING",
|
|
||||||
"display": "Delisting"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-BIO-PRICE-CHANGE",
|
|
||||||
"display": "Price Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-BIO-OTHER-TYPE-IN",
|
|
||||||
"display": "Other"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTC-DIN-CHANGE",
|
|
||||||
"display": "Drug Identification Number (DIN) Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTC-DRUG-PRODUCT-NAME",
|
|
||||||
"display": "Drug Product Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTC-PRODUCT-MONOGRAPH",
|
|
||||||
"display": "Product Monograph Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTC-FORMULATION",
|
|
||||||
"display": "Formulation Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTC-OWNERSHIP",
|
|
||||||
"display": "Ownership Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTC-DISTRIBUTOR",
|
|
||||||
"display": "Distributor Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTC-ORG-NAME",
|
|
||||||
"display": "Organization Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTC-PRODUCT-DISCONTINUE",
|
|
||||||
"display": "Product Discontinuation"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTC-DELISTING",
|
|
||||||
"display": "Delisting"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTC-PRICE-CHANGE",
|
|
||||||
"display": "Price Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTC-OTHER-TYPE-IN",
|
|
||||||
"display": "Other"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-DIABETIC-PRODUCT-NAME",
|
|
||||||
"display": "Product Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-DIABETIC-PRODUCT-CHANGE",
|
|
||||||
"display": "Product Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-DIABETIC-OWNERSHIP",
|
|
||||||
"display": "Ownership Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-DIABETIC-DISTRIBUTOR",
|
|
||||||
"display": "Distributor Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-DIABETIC-ORG-NAME",
|
|
||||||
"display": "Organization Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-DIABETIC-PRODUCT-DISCONTINUE",
|
|
||||||
"display": "Product Discontinuation"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-DIABETIC-DELISTING",
|
|
||||||
"display": "Delisting"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-DIABETIC-PRICE-CHANGE",
|
|
||||||
"display": "Price Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-DIABETIC-OTHER-TYPE-IN",
|
|
||||||
"display": "Other"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-GLUCOSE-PRODUCT-NAME",
|
|
||||||
"display": "Product Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-GLUCOSE-PRODUCT-CHANGE",
|
|
||||||
"display": "Product Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-GLUCOSE-OWNERSHIP",
|
|
||||||
"display": "Ownership Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-GLUCOSE-DISTRIBUTOR",
|
|
||||||
"display": "Distributor Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-GLUCOSE-ORG-NAME",
|
|
||||||
"display": "Organization Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-GLUCOSE-PRODUCT-DISCONTINUE",
|
|
||||||
"display": "Product Discontinuation"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-GLUCOSE-DELISTING",
|
|
||||||
"display": "Delisting"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-GLUCOSE-PRICE-CHANGE",
|
|
||||||
"display": "Price Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-GLUCOSE-OTHER-TYPE-IN",
|
|
||||||
"display": "Other"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-VALVED-PRODUCT-NAME",
|
|
||||||
"display": "Product Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-VALVED-PRODUCT-CHANGE",
|
|
||||||
"display": "Product Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-VALVED-OWNERSHIP",
|
|
||||||
"display": "Ownership Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-VALVED-DISTRIBUTOR",
|
|
||||||
"display": "Distributor Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-VALVED-ORG-NAME",
|
|
||||||
"display": "Organization Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-VALVED-PRODUCT-DISCONTINUE",
|
|
||||||
"display": "Product Discontinuation"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-VALVED-DELISTING",
|
|
||||||
"display": "Delisting"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-VALVED-PRICE-CHANGE",
|
|
||||||
"display": "Price Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-VALVED-OTHER-TYPE-IN",
|
|
||||||
"display": "Other"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NATURAL-PRODUCT-NAME",
|
|
||||||
"display": "Product Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NATURAL-PRODUCT-CHANGE",
|
|
||||||
"display": "Product Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NATURAL-OWNERSHIP",
|
|
||||||
"display": "Ownership Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NATURAL-DISTRIBUTOR",
|
|
||||||
"display": "Distributor Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NATURAL-ORG-NAME",
|
|
||||||
"display": "Organization Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NATURAL-PRODUCT-DISCONTINUE",
|
|
||||||
"display": "Product Discontinuation"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NATURAL-DELISTING",
|
|
||||||
"display": "Delisting"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NATURAL-PRICE-CHANGE",
|
|
||||||
"display": "Price Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NATURAL-OTHER-TYPE-IN",
|
|
||||||
"display": "Other"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTHER-PRODUCT-NAME",
|
|
||||||
"display": "Product Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTHER-PRODUCT-CHANGE",
|
|
||||||
"display": "Product Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTHER-OWNERSHIP",
|
|
||||||
"display": "Ownership Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTHER-DISTRIBUTOR",
|
|
||||||
"display": "Distributor Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTHER-ORG-NAME",
|
|
||||||
"display": "Organization Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTHER-PRODUCT-DISCONTINUE",
|
|
||||||
"display": "Product Discontinuation"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTHER-DELISTING",
|
|
||||||
"display": "Delisting"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTHER-PRICE-CHANGE",
|
|
||||||
"display": "Price Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-OTHER-OTHER-TYPE-IN",
|
|
||||||
"display": "Other"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NUTRITIONAL-PRODUCT-NAME",
|
|
||||||
"display": "Product Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NUTRITIONAL-FORMULATION",
|
|
||||||
"display": "Formulation Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NUTRITIONAL-DRUG-REGULATION",
|
|
||||||
"display": "Change in Food and Drug Regulation Classification"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-ADVERTISING-POLICY",
|
|
||||||
"display": "Change in Advertising Policy"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NUTRITIONAL-OWNERSHIP",
|
|
||||||
"display": "Ownership Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NUTRITIONAL-DISTRIBUTOR",
|
|
||||||
"display": "Distributor Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NUTRITIONAL-ORG-NAME",
|
|
||||||
"display": "Organization Name Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NUTRITIONAL-PRODUCT-DISCONTINUE",
|
|
||||||
"display": "Product Discontinuation"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NUTRITIONAL-DELISTING",
|
|
||||||
"display": "Delisting"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NUTRITIONAL-PRICE-CHANGE",
|
|
||||||
"display": "Price Change"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"code": "SUB-NUTRITIONAL-OTHER-TYPE-IN",
|
|
||||||
"display": "Other"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
""";
|
|
||||||
cs = parser.parseResource(CodeSystem.class, csStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
RequestDetails rq = new SystemRequestDetails();
|
|
||||||
int csToCreate = 1;
|
|
||||||
|
|
||||||
// create a bunch of code systems
|
|
||||||
myValueSetDao.update(vs, rq);
|
|
||||||
String csUrl = cs.getUrl();
|
|
||||||
BundleBuilder builder = new BundleBuilder(myFhirContext);
|
|
||||||
// while (cs.getConcept().size() > 90) {
|
|
||||||
// cs.getConcept().remove(1);
|
|
||||||
// }
|
|
||||||
for (int i = 0; i < csToCreate; i++) {
|
|
||||||
if (i > 0) {
|
|
||||||
cs = parser.parseResource(CodeSystem.class, csStr);
|
cs = parser.parseResource(CodeSystem.class, csStr);
|
||||||
cs.setId("cs" + i);
|
|
||||||
cs.setUrl(csUrl + "-" + i);
|
|
||||||
}
|
}
|
||||||
builder.addTransactionUpdateEntry(cs);
|
|
||||||
// myCodeSystemDao.update(cs, rq);
|
// create our ValueSet
|
||||||
|
myValueSetDao.update(vs, rq);
|
||||||
|
|
||||||
|
// and the code system
|
||||||
|
myCodeSystemDao.update(cs, rq);
|
||||||
|
|
||||||
|
// sanity check to make sure our code system was actually created
|
||||||
|
SearchParameterMap spMap = new SearchParameterMap();
|
||||||
|
spMap.setLoadSynchronous(true);
|
||||||
|
IBundleProvider bp = myCodeSystemDao.search(spMap, rq);
|
||||||
|
assertEquals(1, bp.getAllResources().size());
|
||||||
|
IBaseResource baseResource = bp.getAllResources().get(0);
|
||||||
|
CodeSystem cssaved;
|
||||||
|
if (baseResource instanceof CodeSystem saved) {
|
||||||
|
cssaved = saved;
|
||||||
|
} else {
|
||||||
|
fail("Should be a code system");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
assertEquals(cs.getConcept().size(), cssaved.getConcept().size());
|
||||||
|
|
||||||
|
// test
|
||||||
|
// perform the reindex (we'll only target the CodeSystem here)
|
||||||
|
ReindexJobParameters params = new ReindexJobParameters();
|
||||||
|
params.addUrl("CodeSystem?");
|
||||||
|
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||||
|
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
||||||
|
startRequest.setParameters(params);
|
||||||
|
|
||||||
|
// and wait for it to complete
|
||||||
|
Batch2JobStartResponse response = myJobCoordinator.startInstance(rq, startRequest);
|
||||||
|
myBatch2JobHelper.awaitJobCompletion(response);
|
||||||
|
|
||||||
|
// verify by doing the value expansion
|
||||||
|
ValueSetExpansionOptions options = new ValueSetExpansionOptions();
|
||||||
|
options.setCount(200); // this is way more than exist, so it's ok
|
||||||
|
ValueSet expanded = myValueSetDao.expand(vs, options);
|
||||||
|
assertNotNull(expanded);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the reindex was performed correctly, the expanded ValueSet
|
||||||
|
* should contain all the CodeSystem concepts that we originally
|
||||||
|
* uploaded (and nothing else).
|
||||||
|
*/
|
||||||
|
HashSet<String> all = new HashSet<>();
|
||||||
|
for (CodeSystem.ConceptDefinitionComponent set : cs.getConcept()) {
|
||||||
|
all.add(set.getCode());
|
||||||
|
}
|
||||||
|
for (ValueSet.ValueSetExpansionContainsComponent v : expanded.getExpansion().getContains()) {
|
||||||
|
all.remove(v.getCode());
|
||||||
|
}
|
||||||
|
assertTrue(all.isEmpty(), String.join(", ", all));
|
||||||
|
assertEquals(cs.getConcept().size(), expanded.getExpansion().getTotal());
|
||||||
|
} finally {
|
||||||
|
// set back to standard values
|
||||||
|
myStorageSettings.setDeferIndexingForCodesystemsOfSize(deferredIndexingDefault);
|
||||||
|
ReindexUtils.setRetryDelay(null);
|
||||||
}
|
}
|
||||||
builder.setType("transaction");
|
|
||||||
Bundle bundle = (Bundle) builder.getBundle();
|
|
||||||
mySystemDao.transaction(rq, bundle);
|
|
||||||
|
|
||||||
System.out.println("XXXX " + myDeferredStorageSvc.isStorageQueueEmpty());
|
|
||||||
myDeferredStorageSvc.logQueueForUnitTest();
|
|
||||||
|
|
||||||
|
|
||||||
AtomicInteger counter = new AtomicInteger();
|
|
||||||
await()
|
|
||||||
.atMost(10, TimeUnit.SECONDS)
|
|
||||||
.pollDelay(500, TimeUnit.MILLISECONDS)
|
|
||||||
.until(() -> {
|
|
||||||
boolean isQueueEmpty = myDeferredStorageSvc.isStorageQueueEmpty(false);
|
|
||||||
if (!isQueueEmpty) {
|
|
||||||
myDeferredStorageSvc.saveAllDeferred();
|
|
||||||
}
|
|
||||||
return isQueueEmpty;
|
|
||||||
});
|
|
||||||
if (myDeferredStorageSvc.isJobsExecuting()) {
|
|
||||||
System.out.println("running jobs");
|
|
||||||
myDeferredStorageSvc.saveAllDeferred();
|
|
||||||
myBatch2JobHelper.awaitJobCompletion(TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME);
|
|
||||||
myBatch2JobHelper.awaitJobCompletion(TERM_CODE_SYSTEM_DELETE_JOB_NAME);
|
|
||||||
|
|
||||||
}
|
|
||||||
System.out.println("XXX " + myDeferredStorageSvc.isStorageQueueEmpty() + " " + counter.get());
|
|
||||||
myDeferredStorageSvc.logQueueForUnitTest();
|
|
||||||
myDeferredStorageSvc.toString();
|
|
||||||
|
|
||||||
// myDeferredStorageSvc.saveAllDeferred();
|
|
||||||
|
|
||||||
// perform the reindex
|
|
||||||
ReindexJobParameters params = new ReindexJobParameters();
|
|
||||||
params.addUrl("CodeSystem?");
|
|
||||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
|
||||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
|
||||||
startRequest.setParameters(params);
|
|
||||||
System.out.println("xxxx starting reindex");
|
|
||||||
Batch2JobStartResponse response = myJobCoordinator.startInstance(rq, startRequest);
|
|
||||||
myBatch2JobHelper.awaitJobCompletion(response);
|
|
||||||
System.out.println("XXXX reindex done " + myDeferredStorageSvc.isStorageQueueEmpty());
|
|
||||||
|
|
||||||
// get the codes
|
|
||||||
SearchParameterMap spMap = new SearchParameterMap();
|
|
||||||
spMap.setLoadSynchronous(true);
|
|
||||||
IBundleProvider bp = myCodeSystemDao.search(spMap, rq);
|
|
||||||
|
|
||||||
// do value expansion
|
|
||||||
ValueSetExpansionOptions options = new ValueSetExpansionOptions();
|
|
||||||
options.setCount(200);
|
|
||||||
ValueSet expanded = myValueSetDao.expand(vs, options);
|
|
||||||
assertNotNull(expanded);
|
|
||||||
HashSet<String> all = new HashSet<>();
|
|
||||||
for (CodeSystem.ConceptDefinitionComponent set : cs.getConcept()) {
|
|
||||||
all.add(set.getCode());
|
|
||||||
}
|
|
||||||
for (ValueSet.ValueSetExpansionContainsComponent v : expanded.getExpansion().getContains()) {
|
|
||||||
all.remove(v.getCode());
|
|
||||||
}
|
|
||||||
assertTrue(all.isEmpty(), String.join(", ", all));
|
|
||||||
assertEquals(cs.getConcept().size(), expanded.getExpansion().getTotal());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private IJobCoordinator myJobCoordinator;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private ITermDeferredStorageSvc myDeferredStorageSvc;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
*/
|
*/
|
||||||
package ca.uhn.fhir.jpa.test;
|
package ca.uhn.fhir.jpa.test;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobCoordinator;
|
||||||
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
|
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
|
||||||
import ca.uhn.fhir.batch2.jobs.export.BulkDataExportProvider;
|
import ca.uhn.fhir.batch2.jobs.export.BulkDataExportProvider;
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
@ -559,6 +560,8 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
protected IJobMaintenanceService myJobMaintenanceService;
|
protected IJobMaintenanceService myJobMaintenanceService;
|
||||||
|
@Autowired
|
||||||
|
protected IJobCoordinator myJobCoordinator;
|
||||||
|
|
||||||
@RegisterExtension
|
@RegisterExtension
|
||||||
private final PreventDanglingInterceptorsExtension myPreventDanglingInterceptorsExtension = new PreventDanglingInterceptorsExtension(()-> myInterceptorRegistry);
|
private final PreventDanglingInterceptorsExtension myPreventDanglingInterceptorsExtension = new PreventDanglingInterceptorsExtension(()-> myInterceptorRegistry);
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
package ca.uhn.fhir.batch2.jobs.reindex;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
||||||
|
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||||
|
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||||
|
|
||||||
|
public class BaseReindexStep {
|
||||||
|
|
||||||
|
public static final int REINDEX_MAX_RETRIES = 10;
|
||||||
|
|
||||||
|
protected final HapiTransactionService myHapiTransactionService;
|
||||||
|
|
||||||
|
protected final IFhirSystemDao<?, ?> mySystemDao;
|
||||||
|
|
||||||
|
protected final DaoRegistry myDaoRegistry;
|
||||||
|
|
||||||
|
protected final IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
|
||||||
|
|
||||||
|
public BaseReindexStep(
|
||||||
|
HapiTransactionService theHapiTransactionService,
|
||||||
|
IFhirSystemDao<?, ?> theSystemDao,
|
||||||
|
DaoRegistry theRegistry,
|
||||||
|
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
|
||||||
|
myHapiTransactionService = theHapiTransactionService;
|
||||||
|
mySystemDao = theSystemDao;
|
||||||
|
myDaoRegistry = theRegistry;
|
||||||
|
myIdHelperService = theIdHelperService;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReindexResults doReindex(
|
||||||
|
ResourceIdListWorkChunkJson data,
|
||||||
|
IJobDataSink<?> theDataSink,
|
||||||
|
String theInstanceId,
|
||||||
|
String theChunkId,
|
||||||
|
ReindexJobParameters theJobParameters) {
|
||||||
|
RequestDetails requestDetails = new SystemRequestDetails();
|
||||||
|
requestDetails.setRetry(true);
|
||||||
|
requestDetails.setMaxRetries(REINDEX_MAX_RETRIES);
|
||||||
|
|
||||||
|
TransactionDetails transactionDetails = new TransactionDetails();
|
||||||
|
ReindexTask.JobParameters jp = new ReindexTask.JobParameters();
|
||||||
|
jp.setData(data)
|
||||||
|
.setRequestDetails(requestDetails)
|
||||||
|
.setTransactionDetails(transactionDetails)
|
||||||
|
.setDataSink(theDataSink)
|
||||||
|
.setInstanceId(theInstanceId)
|
||||||
|
.setChunkId(theChunkId)
|
||||||
|
.setJobParameters(theJobParameters);
|
||||||
|
|
||||||
|
ReindexTask reindexJob = new ReindexTask(jp, myDaoRegistry, mySystemDao, myIdHelperService);
|
||||||
|
|
||||||
|
return myHapiTransactionService
|
||||||
|
.withRequest(requestDetails)
|
||||||
|
.withTransactionDetails(transactionDetails)
|
||||||
|
.withRequestPartitionId(data.getRequestPartitionId())
|
||||||
|
.execute(reindexJob);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
package ca.uhn.fhir.batch2.jobs.reindex;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
||||||
|
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
|
||||||
|
import ca.uhn.fhir.batch2.api.RetryChunkLaterException;
|
||||||
|
import ca.uhn.fhir.batch2.api.RunOutcome;
|
||||||
|
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||||
|
import ca.uhn.fhir.batch2.api.VoidModel;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
|
||||||
|
import jakarta.annotation.Nonnull;
|
||||||
|
|
||||||
|
public class CheckPendingReindexWorkStep implements IJobStepWorker<ReindexJobParameters, ReindexResults, VoidModel> {
|
||||||
|
|
||||||
|
private final ReindexJobService myReindexJobService;
|
||||||
|
|
||||||
|
public CheckPendingReindexWorkStep(ReindexJobService theReindexJobService) {
|
||||||
|
myReindexJobService = theReindexJobService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nonnull
|
||||||
|
@Override
|
||||||
|
public RunOutcome run(
|
||||||
|
@Nonnull StepExecutionDetails<ReindexJobParameters, ReindexResults> theStepExecutionDetails,
|
||||||
|
@Nonnull IJobDataSink<VoidModel> theDataSink)
|
||||||
|
throws JobExecutionFailedException {
|
||||||
|
|
||||||
|
ReindexResults results = theStepExecutionDetails.getData();
|
||||||
|
|
||||||
|
if (!results.getResourceToHasWorkToComplete().isEmpty()) {
|
||||||
|
if (myReindexJobService.anyResourceHasPendingReindexWork(results.getResourceToHasWorkToComplete())) {
|
||||||
|
// give time for reindex work to complete
|
||||||
|
throw new RetryChunkLaterException(ReindexUtils.getRetryLaterDelay());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return RunOutcome.SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,12 +26,20 @@ import ca.uhn.fhir.batch2.api.VoidModel;
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
|
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
|
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
|
||||||
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
|
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
|
||||||
import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
|
import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
|
||||||
import ca.uhn.fhir.batch2.model.JobDefinition;
|
import ca.uhn.fhir.batch2.model.JobDefinition;
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
||||||
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
|
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
|
||||||
|
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||||
|
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||||
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
|
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
@ -40,8 +48,47 @@ public class ReindexAppCtx {
|
||||||
|
|
||||||
public static final String JOB_REINDEX = "REINDEX";
|
public static final String JOB_REINDEX = "REINDEX";
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private HapiTransactionService myHapiTransactionService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IFhirSystemDao<?, ?> mySystemDao;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private DaoRegistry myRegistry;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
|
||||||
|
|
||||||
|
// Version 2
|
||||||
@Bean
|
@Bean
|
||||||
public JobDefinition<ReindexJobParameters> reindexJobDefinition(IBatch2DaoSvc theBatch2DaoSvc) {
|
public JobDefinition<ReindexJobParameters> reindexJobDefinitionV2(IBatch2DaoSvc theBatch2DaoSvc) {
|
||||||
|
return JobDefinition.newBuilder()
|
||||||
|
.setJobDefinitionId(JOB_REINDEX)
|
||||||
|
.setJobDescription("Reindex resources")
|
||||||
|
.setJobDefinitionVersion(2)
|
||||||
|
.setParametersType(ReindexJobParameters.class)
|
||||||
|
.setParametersValidator(reindexJobParametersValidator(theBatch2DaoSvc))
|
||||||
|
.gatedExecution()
|
||||||
|
.addFirstStep(
|
||||||
|
"generate-ranges",
|
||||||
|
"Generate data ranges to reindex",
|
||||||
|
ChunkRangeJson.class,
|
||||||
|
reindexGenerateRangeChunksStep())
|
||||||
|
.addIntermediateStep(
|
||||||
|
"load-ids",
|
||||||
|
"Load IDs of resources to reindex",
|
||||||
|
ResourceIdListWorkChunkJson.class,
|
||||||
|
reindexLoadIdsStep(theBatch2DaoSvc))
|
||||||
|
.addIntermediateStep(
|
||||||
|
"reindex-start", "Perform the resource reindex", ReindexResults.class, reindexStepV2())
|
||||||
|
.addLastStep("reindex-pending-work", "Waits for reindex work to complete.", pendingWorkStep())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Version 1
|
||||||
|
@Bean
|
||||||
|
public JobDefinition<ReindexJobParameters> reindexJobDefinitionV1(IBatch2DaoSvc theBatch2DaoSvc) {
|
||||||
return JobDefinition.newBuilder()
|
return JobDefinition.newBuilder()
|
||||||
.setJobDefinitionId(JOB_REINDEX)
|
.setJobDefinitionId(JOB_REINDEX)
|
||||||
.setJobDescription("Reindex resources")
|
.setJobDescription("Reindex resources")
|
||||||
|
@ -59,9 +106,7 @@ public class ReindexAppCtx {
|
||||||
"Load IDs of resources to reindex",
|
"Load IDs of resources to reindex",
|
||||||
ResourceIdListWorkChunkJson.class,
|
ResourceIdListWorkChunkJson.class,
|
||||||
reindexLoadIdsStep(theBatch2DaoSvc))
|
reindexLoadIdsStep(theBatch2DaoSvc))
|
||||||
.addLastStep("reindex",
|
.addLastStep("reindex-start", "Start the resource reindex", reindexStepV1())
|
||||||
"Perform the resource reindex",
|
|
||||||
reindexStep())
|
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,8 +128,18 @@ public class ReindexAppCtx {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public ReindexStep reindexStep() {
|
public ReindexStepV1 reindexStepV1() {
|
||||||
return new ReindexStep();
|
return new ReindexStepV1(myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ReindexStepV2 reindexStepV2() {
|
||||||
|
return new ReindexStepV2(jobService(), myHapiTransactionService, mySystemDao, myRegistry, myIdHelperService);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public CheckPendingReindexWorkStep pendingWorkStep() {
|
||||||
|
return new CheckPendingReindexWorkStep(jobService());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
@ -94,4 +149,9 @@ public class ReindexAppCtx {
|
||||||
IJobPartitionProvider theJobPartitionHandler) {
|
IJobPartitionProvider theJobPartitionHandler) {
|
||||||
return new ReindexProvider(theFhirContext, theJobCoordinator, theJobPartitionHandler);
|
return new ReindexProvider(theFhirContext, theJobCoordinator, theJobPartitionHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ReindexJobService jobService() {
|
||||||
|
return new ReindexJobService(myRegistry);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,203 +0,0 @@
|
||||||
/*-
|
|
||||||
* #%L
|
|
||||||
* hapi-fhir-storage-batch2-jobs
|
|
||||||
* %%
|
|
||||||
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
|
|
||||||
* %%
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
* #L%
|
|
||||||
*/
|
|
||||||
package ca.uhn.fhir.batch2.jobs.reindex;
|
|
||||||
|
|
||||||
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
|
||||||
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
|
||||||
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
|
|
||||||
import ca.uhn.fhir.batch2.api.RunOutcome;
|
|
||||||
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
|
||||||
import ca.uhn.fhir.batch2.api.VoidModel;
|
|
||||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
|
||||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
|
||||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
|
||||||
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
|
||||||
import ca.uhn.fhir.jpa.api.dao.ReindexOutcome;
|
|
||||||
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
|
|
||||||
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
|
||||||
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
|
||||||
import ca.uhn.fhir.parser.DataFormatException;
|
|
||||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
|
||||||
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
|
||||||
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
|
||||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
|
||||||
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
|
|
||||||
import ca.uhn.fhir.util.StopWatch;
|
|
||||||
import jakarta.annotation.Nonnull;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.transaction.TransactionStatus;
|
|
||||||
import org.springframework.transaction.support.TransactionCallback;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
public class ReindexStep implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
|
|
||||||
|
|
||||||
public static final int REINDEX_MAX_RETRIES = 10;
|
|
||||||
|
|
||||||
private static final Logger ourLog = LoggerFactory.getLogger(ReindexStep.class);
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private HapiTransactionService myHapiTransactionService;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private IFhirSystemDao<?, ?> mySystemDao;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private DaoRegistry myDaoRegistry;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private IIdHelperService<IResourcePersistentId> myIdHelperService;
|
|
||||||
|
|
||||||
@Nonnull
|
|
||||||
@Override
|
|
||||||
public RunOutcome run(
|
|
||||||
@Nonnull StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails,
|
|
||||||
@Nonnull IJobDataSink<VoidModel> theDataSink)
|
|
||||||
throws JobExecutionFailedException {
|
|
||||||
|
|
||||||
ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData();
|
|
||||||
ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters();
|
|
||||||
|
|
||||||
return doReindex(
|
|
||||||
data,
|
|
||||||
theDataSink,
|
|
||||||
theStepExecutionDetails.getInstance().getInstanceId(),
|
|
||||||
theStepExecutionDetails.getChunkId(),
|
|
||||||
jobParameters);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Nonnull
|
|
||||||
public RunOutcome doReindex(
|
|
||||||
ResourceIdListWorkChunkJson data,
|
|
||||||
IJobDataSink<VoidModel> theDataSink,
|
|
||||||
String theInstanceId,
|
|
||||||
String theChunkId,
|
|
||||||
ReindexJobParameters theJobParameters) {
|
|
||||||
RequestDetails requestDetails = new SystemRequestDetails();
|
|
||||||
requestDetails.setRetry(true);
|
|
||||||
requestDetails.setMaxRetries(REINDEX_MAX_RETRIES);
|
|
||||||
TransactionDetails transactionDetails = new TransactionDetails();
|
|
||||||
ReindexJob reindexJob = new ReindexJob(
|
|
||||||
data, requestDetails, transactionDetails, theDataSink, theInstanceId, theChunkId, theJobParameters);
|
|
||||||
|
|
||||||
myHapiTransactionService
|
|
||||||
.withRequest(requestDetails)
|
|
||||||
.withTransactionDetails(transactionDetails)
|
|
||||||
.withRequestPartitionId(data.getRequestPartitionId())
|
|
||||||
.execute(reindexJob);
|
|
||||||
|
|
||||||
return new RunOutcome(data.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
private class ReindexJob implements TransactionCallback<Void> {
|
|
||||||
private final ResourceIdListWorkChunkJson myData;
|
|
||||||
private final RequestDetails myRequestDetails;
|
|
||||||
private final TransactionDetails myTransactionDetails;
|
|
||||||
private final IJobDataSink<VoidModel> myDataSink;
|
|
||||||
private final String myChunkId;
|
|
||||||
private final String myInstanceId;
|
|
||||||
private final ReindexJobParameters myJobParameters;
|
|
||||||
|
|
||||||
public ReindexJob(
|
|
||||||
ResourceIdListWorkChunkJson theData,
|
|
||||||
RequestDetails theRequestDetails,
|
|
||||||
TransactionDetails theTransactionDetails,
|
|
||||||
IJobDataSink<VoidModel> theDataSink,
|
|
||||||
String theInstanceId,
|
|
||||||
String theChunkId,
|
|
||||||
ReindexJobParameters theJobParameters) {
|
|
||||||
myData = theData;
|
|
||||||
myRequestDetails = theRequestDetails;
|
|
||||||
myTransactionDetails = theTransactionDetails;
|
|
||||||
myDataSink = theDataSink;
|
|
||||||
myInstanceId = theInstanceId;
|
|
||||||
myChunkId = theChunkId;
|
|
||||||
myJobParameters = theJobParameters;
|
|
||||||
myDataSink.setWarningProcessor(new ReindexWarningProcessor());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Void doInTransaction(@Nonnull TransactionStatus theStatus) {
|
|
||||||
|
|
||||||
List<IResourcePersistentId> persistentIds = myData.getResourcePersistentIds(myIdHelperService);
|
|
||||||
|
|
||||||
ourLog.info(
|
|
||||||
"Starting reindex work chunk with {} resources - Instance[{}] Chunk[{}]",
|
|
||||||
persistentIds.size(),
|
|
||||||
myInstanceId,
|
|
||||||
myChunkId);
|
|
||||||
StopWatch sw = new StopWatch();
|
|
||||||
|
|
||||||
// Prefetch Resources from DB
|
|
||||||
|
|
||||||
boolean reindexSearchParameters =
|
|
||||||
myJobParameters.getReindexSearchParameters() != ReindexParameters.ReindexSearchParametersEnum.NONE;
|
|
||||||
mySystemDao.preFetchResources(persistentIds, reindexSearchParameters);
|
|
||||||
ourLog.info(
|
|
||||||
"Prefetched {} resources in {} - Instance[{}] Chunk[{}]",
|
|
||||||
persistentIds.size(),
|
|
||||||
sw,
|
|
||||||
myInstanceId,
|
|
||||||
myChunkId);
|
|
||||||
|
|
||||||
ReindexParameters parameters = new ReindexParameters()
|
|
||||||
.setReindexSearchParameters(myJobParameters.getReindexSearchParameters())
|
|
||||||
.setOptimizeStorage(myJobParameters.getOptimizeStorage())
|
|
||||||
.setOptimisticLock(myJobParameters.getOptimisticLock());
|
|
||||||
|
|
||||||
// Reindex
|
|
||||||
|
|
||||||
sw.restart();
|
|
||||||
for (int i = 0; i < myData.size(); i++) {
|
|
||||||
|
|
||||||
String nextResourceType = myData.getResourceType(i);
|
|
||||||
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(nextResourceType);
|
|
||||||
IResourcePersistentId<?> resourcePersistentId = persistentIds.get(i);
|
|
||||||
try {
|
|
||||||
|
|
||||||
ReindexOutcome outcome =
|
|
||||||
dao.reindex(resourcePersistentId, parameters, myRequestDetails, myTransactionDetails);
|
|
||||||
outcome.getWarnings().forEach(myDataSink::recoveredError);
|
|
||||||
|
|
||||||
} catch (BaseServerResponseException | DataFormatException e) {
|
|
||||||
String resourceForcedId = myIdHelperService
|
|
||||||
.translatePidIdToForcedIdWithCache(resourcePersistentId)
|
|
||||||
.orElse(resourcePersistentId.toString());
|
|
||||||
String resourceId = nextResourceType + "/" + resourceForcedId;
|
|
||||||
ourLog.debug("Failure during reindexing {}", resourceId, e);
|
|
||||||
myDataSink.recoveredError("Failure reindexing " + resourceId + ": " + e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ourLog.info(
|
|
||||||
"Finished reindexing {} resources in {} - {}/sec - Instance[{}] Chunk[{}]",
|
|
||||||
persistentIds.size(),
|
|
||||||
sw,
|
|
||||||
sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS),
|
|
||||||
myInstanceId,
|
|
||||||
myChunkId);
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
/*-
|
||||||
|
* #%L
|
||||||
|
* hapi-fhir-storage-batch2-jobs
|
||||||
|
* %%
|
||||||
|
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
|
||||||
|
* %%
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
* #L%
|
||||||
|
*/
|
||||||
|
package ca.uhn.fhir.batch2.jobs.reindex;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
||||||
|
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
|
||||||
|
import ca.uhn.fhir.batch2.api.RunOutcome;
|
||||||
|
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||||
|
import ca.uhn.fhir.batch2.api.VoidModel;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
||||||
|
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||||
|
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||||
|
import jakarta.annotation.Nonnull;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class ReindexStepV1 extends BaseReindexStep
|
||||||
|
implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
|
||||||
|
|
||||||
|
private static final Logger ourLog = LoggerFactory.getLogger(ReindexStepV1.class);
|
||||||
|
|
||||||
|
public ReindexStepV1(
|
||||||
|
HapiTransactionService theHapiTransactionService,
|
||||||
|
IFhirSystemDao<?, ?> theSystemDao,
|
||||||
|
DaoRegistry theRegistry,
|
||||||
|
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
|
||||||
|
super(theHapiTransactionService, theSystemDao, theRegistry, theIdHelperService);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nonnull
|
||||||
|
@Override
|
||||||
|
public RunOutcome run(
|
||||||
|
@Nonnull StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails,
|
||||||
|
@Nonnull IJobDataSink<VoidModel> theDataSink)
|
||||||
|
throws JobExecutionFailedException {
|
||||||
|
|
||||||
|
ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData();
|
||||||
|
ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters();
|
||||||
|
|
||||||
|
doReindex(
|
||||||
|
data,
|
||||||
|
theDataSink,
|
||||||
|
theStepExecutionDetails.getInstance().getInstanceId(),
|
||||||
|
theStepExecutionDetails.getChunkId(),
|
||||||
|
jobParameters);
|
||||||
|
|
||||||
|
return new RunOutcome(data.size());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,70 @@
|
||||||
|
package ca.uhn.fhir.batch2.jobs.reindex;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobStepWorker;
|
||||||
|
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
|
||||||
|
import ca.uhn.fhir.batch2.api.RetryChunkLaterException;
|
||||||
|
import ca.uhn.fhir.batch2.api.RunOutcome;
|
||||||
|
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.svcs.ReindexJobService;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
||||||
|
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||||
|
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||||
|
import jakarta.annotation.Nonnull;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class ReindexStepV2 extends BaseReindexStep
|
||||||
|
implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, ReindexResults> {
|
||||||
|
|
||||||
|
private final ReindexJobService myReindexJobService;
|
||||||
|
|
||||||
|
public ReindexStepV2(
|
||||||
|
ReindexJobService theJobService,
|
||||||
|
HapiTransactionService theHapiTransactionService,
|
||||||
|
IFhirSystemDao<?, ?> theSystemDao,
|
||||||
|
DaoRegistry theRegistry,
|
||||||
|
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
|
||||||
|
super(theHapiTransactionService, theSystemDao, theRegistry, theIdHelperService);
|
||||||
|
myReindexJobService = theJobService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nonnull
|
||||||
|
@Override
|
||||||
|
public RunOutcome run(
|
||||||
|
@Nonnull StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails,
|
||||||
|
@Nonnull IJobDataSink<ReindexResults> theDataSink)
|
||||||
|
throws JobExecutionFailedException {
|
||||||
|
ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData();
|
||||||
|
ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters();
|
||||||
|
|
||||||
|
// This is not strictly necessary;
|
||||||
|
// but we'll ensure that no outstanding "reindex work"
|
||||||
|
// is waiting to be completed, so that when we do
|
||||||
|
// our reindex work here, it won't skip over that data
|
||||||
|
Map<String, Boolean> resourceTypesToCheckFlag = new HashMap<>();
|
||||||
|
data.getTypedPids().forEach(id -> {
|
||||||
|
// we don't really care about duplicates; we check by resource type
|
||||||
|
resourceTypesToCheckFlag.put(id.getResourceType(), true);
|
||||||
|
});
|
||||||
|
if (myReindexJobService.anyResourceHasPendingReindexWork(resourceTypesToCheckFlag)) {
|
||||||
|
throw new RetryChunkLaterException(ReindexUtils.getRetryLaterDelay());
|
||||||
|
}
|
||||||
|
|
||||||
|
ReindexResults results = doReindex(
|
||||||
|
data,
|
||||||
|
theDataSink,
|
||||||
|
theStepExecutionDetails.getInstance().getInstanceId(),
|
||||||
|
theStepExecutionDetails.getChunkId(),
|
||||||
|
jobParameters);
|
||||||
|
|
||||||
|
theDataSink.accept(results);
|
||||||
|
|
||||||
|
return new RunOutcome(data.size());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,199 @@
|
||||||
|
package ca.uhn.fhir.batch2.jobs.reindex;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||||
|
import ca.uhn.fhir.batch2.jobs.reindex.models.ReindexResults;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.ReindexOutcome;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
|
||||||
|
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
|
||||||
|
import ca.uhn.fhir.parser.DataFormatException;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||||
|
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
|
||||||
|
import ca.uhn.fhir.util.StopWatch;
|
||||||
|
import jakarta.annotation.Nonnull;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.transaction.TransactionStatus;
|
||||||
|
import org.springframework.transaction.support.TransactionCallback;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class ReindexTask implements TransactionCallback<ReindexResults> {
|
||||||
|
private static final Logger ourLog = LoggerFactory.getLogger(ReindexTask.class);
|
||||||
|
|
||||||
|
public static class JobParameters {
|
||||||
|
private ResourceIdListWorkChunkJson myData;
|
||||||
|
private RequestDetails myRequestDetails;
|
||||||
|
private TransactionDetails myTransactionDetails;
|
||||||
|
private IJobDataSink<?> myDataSink;
|
||||||
|
private String myChunkId;
|
||||||
|
private String myInstanceId;
|
||||||
|
private ReindexJobParameters myJobParameters;
|
||||||
|
|
||||||
|
public ResourceIdListWorkChunkJson getData() {
|
||||||
|
return myData;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JobParameters setData(ResourceIdListWorkChunkJson theData) {
|
||||||
|
myData = theData;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RequestDetails getRequestDetails() {
|
||||||
|
return myRequestDetails;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JobParameters setRequestDetails(RequestDetails theRequestDetails) {
|
||||||
|
myRequestDetails = theRequestDetails;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TransactionDetails getTransactionDetails() {
|
||||||
|
return myTransactionDetails;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JobParameters setTransactionDetails(TransactionDetails theTransactionDetails) {
|
||||||
|
myTransactionDetails = theTransactionDetails;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IJobDataSink<?> getDataSink() {
|
||||||
|
return myDataSink;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JobParameters setDataSink(IJobDataSink<?> theDataSink) {
|
||||||
|
myDataSink = theDataSink;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getChunkId() {
|
||||||
|
return myChunkId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JobParameters setChunkId(String theChunkId) {
|
||||||
|
myChunkId = theChunkId;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getInstanceId() {
|
||||||
|
return myInstanceId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JobParameters setInstanceId(String theInstanceId) {
|
||||||
|
myInstanceId = theInstanceId;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReindexJobParameters getJobParameters() {
|
||||||
|
return myJobParameters;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JobParameters setJobParameters(ReindexJobParameters theJobParameters) {
|
||||||
|
myJobParameters = theJobParameters;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final DaoRegistry myDaoRegistry;
|
||||||
|
private final IFhirSystemDao<?, ?> mySystemDao;
|
||||||
|
|
||||||
|
private final IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
|
||||||
|
|
||||||
|
private final ResourceIdListWorkChunkJson myData;
|
||||||
|
private final RequestDetails myRequestDetails;
|
||||||
|
private final TransactionDetails myTransactionDetails;
|
||||||
|
private final IJobDataSink<?> myDataSink;
|
||||||
|
private final String myChunkId;
|
||||||
|
private final String myInstanceId;
|
||||||
|
private final ReindexJobParameters myJobParameters;
|
||||||
|
|
||||||
|
public ReindexTask(
|
||||||
|
JobParameters theJobParameters,
|
||||||
|
DaoRegistry theRegistry,
|
||||||
|
IFhirSystemDao<?, ?> theSystemDao,
|
||||||
|
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
|
||||||
|
myDaoRegistry = theRegistry;
|
||||||
|
mySystemDao = theSystemDao;
|
||||||
|
myIdHelperService = theIdHelperService;
|
||||||
|
|
||||||
|
myData = theJobParameters.getData();
|
||||||
|
myRequestDetails = theJobParameters.getRequestDetails();
|
||||||
|
myTransactionDetails = theJobParameters.getTransactionDetails();
|
||||||
|
myDataSink = theJobParameters.getDataSink();
|
||||||
|
myInstanceId = theJobParameters.getInstanceId();
|
||||||
|
myChunkId = theJobParameters.getChunkId();
|
||||||
|
myJobParameters = theJobParameters.getJobParameters();
|
||||||
|
myDataSink.setWarningProcessor(new ReindexWarningProcessor());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReindexResults doInTransaction(@Nonnull TransactionStatus theStatus) {
|
||||||
|
List<IResourcePersistentId<?>> persistentIds = myData.getResourcePersistentIds(myIdHelperService);
|
||||||
|
|
||||||
|
ourLog.info(
|
||||||
|
"Starting reindex work chunk with {} resources - Instance[{}] Chunk[{}]",
|
||||||
|
persistentIds.size(),
|
||||||
|
myInstanceId,
|
||||||
|
myChunkId);
|
||||||
|
StopWatch sw = new StopWatch();
|
||||||
|
ReindexResults reindexResults = new ReindexResults();
|
||||||
|
|
||||||
|
// Prefetch Resources from DB
|
||||||
|
boolean reindexSearchParameters =
|
||||||
|
myJobParameters.getReindexSearchParameters() != ReindexParameters.ReindexSearchParametersEnum.NONE;
|
||||||
|
mySystemDao.preFetchResources(persistentIds, reindexSearchParameters);
|
||||||
|
ourLog.info(
|
||||||
|
"Prefetched {} resources in {} - Instance[{}] Chunk[{}]",
|
||||||
|
persistentIds.size(),
|
||||||
|
sw,
|
||||||
|
myInstanceId,
|
||||||
|
myChunkId);
|
||||||
|
|
||||||
|
ReindexParameters parameters = new ReindexParameters()
|
||||||
|
.setReindexSearchParameters(myJobParameters.getReindexSearchParameters())
|
||||||
|
.setOptimizeStorage(myJobParameters.getOptimizeStorage())
|
||||||
|
.setOptimisticLock(myJobParameters.getOptimisticLock());
|
||||||
|
|
||||||
|
// Reindex
|
||||||
|
|
||||||
|
sw.restart();
|
||||||
|
for (int i = 0; i < myData.size(); i++) {
|
||||||
|
|
||||||
|
String nextResourceType = myData.getResourceType(i);
|
||||||
|
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(nextResourceType);
|
||||||
|
IResourcePersistentId<?> resourcePersistentId = persistentIds.get(i);
|
||||||
|
try {
|
||||||
|
|
||||||
|
ReindexOutcome outcome =
|
||||||
|
dao.reindex(resourcePersistentId, parameters, myRequestDetails, myTransactionDetails);
|
||||||
|
|
||||||
|
outcome.getWarnings().forEach(myDataSink::recoveredError);
|
||||||
|
reindexResults.addResourceTypeToCompletionStatus(nextResourceType, outcome.isHasPendingWork());
|
||||||
|
|
||||||
|
} catch (BaseServerResponseException | DataFormatException e) {
|
||||||
|
String resourceForcedId = myIdHelperService
|
||||||
|
.translatePidIdToForcedIdWithCache(resourcePersistentId)
|
||||||
|
.orElse(resourcePersistentId.toString());
|
||||||
|
String resourceId = nextResourceType + "/" + resourceForcedId;
|
||||||
|
ourLog.error("Failure during reindexing {}", resourceId, e);
|
||||||
|
myDataSink.recoveredError("Failure reindexing " + resourceId + ": " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ourLog.info(
|
||||||
|
"Finished reindexing {} resources in {} - {}/sec - Instance[{}] Chunk[{}]",
|
||||||
|
persistentIds.size(),
|
||||||
|
sw,
|
||||||
|
sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS),
|
||||||
|
myInstanceId,
|
||||||
|
myChunkId);
|
||||||
|
|
||||||
|
return reindexResults;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,32 @@
|
||||||
|
package ca.uhn.fhir.batch2.jobs.reindex;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
|
|
||||||
|
public class ReindexUtils {
|
||||||
|
|
||||||
|
private static final Duration RETRY_DELAY = Duration.of(30, ChronoUnit.SECONDS);
|
||||||
|
|
||||||
|
private static Duration myDelay;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the retry delay for reindex jobs that require polling.
|
||||||
|
*/
|
||||||
|
public static Duration getRetryLaterDelay() {
|
||||||
|
if (myDelay != null) {
|
||||||
|
return myDelay;
|
||||||
|
}
|
||||||
|
return RETRY_DELAY;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the retry delay to use for reindex jobs.
|
||||||
|
* Do not use this in production code! Only test code.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static void setRetryDelay(Duration theDuration) {
|
||||||
|
myDelay = theDuration;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
package ca.uhn.fhir.batch2.jobs.reindex.models;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.model.api.IModelJson;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
|
||||||
|
public class ReindexResults implements IModelJson {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A map of resource type : whether or not the reindex is completed;
|
||||||
|
* true = more work needed. false (or omitted) = reindex is done
|
||||||
|
*/
|
||||||
|
@JsonProperty("resource2NeedsWork")
|
||||||
|
private HashMap<String, Boolean> myResourceToHasWorkToComplete;
|
||||||
|
|
||||||
|
public ReindexResults() {}
|
||||||
|
|
||||||
|
public HashMap<String, Boolean> getResourceToHasWorkToComplete() {
|
||||||
|
if (myResourceToHasWorkToComplete == null) {
|
||||||
|
myResourceToHasWorkToComplete = new HashMap<>();
|
||||||
|
}
|
||||||
|
return myResourceToHasWorkToComplete;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addResourceTypeToCompletionStatus(String theResourceType, boolean theRequiresMoreWork) {
|
||||||
|
getResourceToHasWorkToComplete().put(theResourceType, theRequiresMoreWork);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
package ca.uhn.fhir.batch2.jobs.reindex.svcs;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||||
|
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||||
|
import ca.uhn.fhir.jpa.api.model.ReindexJobStatus;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class ReindexJobService {
|
||||||
|
|
||||||
|
private final DaoRegistry myDaoRegistry;
|
||||||
|
|
||||||
|
public ReindexJobService(DaoRegistry theRegistry) {
|
||||||
|
myDaoRegistry = theRegistry;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if any of the resource types in the map have any pending reindex work waiting.
|
||||||
|
* This will return true after the first such encounter, and only return false if no
|
||||||
|
* reindex work is required for any resource.
|
||||||
|
* @param theResourceTypesToCheckFlag map of resourceType:whether or not to check
|
||||||
|
* @return true if there's reindex work pending, false otherwise
|
||||||
|
*/
|
||||||
|
public boolean anyResourceHasPendingReindexWork(Map<String, Boolean> theResourceTypesToCheckFlag) {
|
||||||
|
for (String resourceType : theResourceTypesToCheckFlag.keySet()) {
|
||||||
|
boolean toCheck = theResourceTypesToCheckFlag.get(resourceType);
|
||||||
|
if (toCheck) {
|
||||||
|
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType);
|
||||||
|
|
||||||
|
ReindexJobStatus status = dao.getReindexJobStatus();
|
||||||
|
if (status.isHasReindexWorkPending()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -13,7 +13,7 @@ import java.util.List;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
public class ReindexJobParametersValidatorTest {
|
public class ReindexTaskParametersValidatorTest {
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private UrlListValidator myListValidator;
|
private UrlListValidator myListValidator;
|
|
@ -62,7 +62,7 @@ public class ResourceIdListWorkChunkJson implements IModelJson {
|
||||||
return myRequestPartitionId;
|
return myRequestPartitionId;
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<TypedPidJson> getTypedPids() {
|
public List<TypedPidJson> getTypedPids() {
|
||||||
if (myTypedPids == null) {
|
if (myTypedPids == null) {
|
||||||
myTypedPids = new ArrayList<>();
|
myTypedPids = new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import ca.uhn.fhir.jpa.api.model.DeleteConflictList;
|
||||||
import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome;
|
import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome;
|
||||||
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
|
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
|
||||||
import ca.uhn.fhir.jpa.api.model.ExpungeOutcome;
|
import ca.uhn.fhir.jpa.api.model.ExpungeOutcome;
|
||||||
|
import ca.uhn.fhir.jpa.api.model.ReindexJobStatus;
|
||||||
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
|
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
|
||||||
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
|
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
|
||||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||||
|
@ -315,12 +316,24 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
|
||||||
* @param theResourcePersistentId The ID
|
* @param theResourcePersistentId The ID
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
ReindexOutcome reindex(
|
ReindexOutcome reindex(
|
||||||
IResourcePersistentId theResourcePersistentId,
|
IResourcePersistentId theResourcePersistentId,
|
||||||
ReindexParameters theReindexParameters,
|
ReindexParameters theReindexParameters,
|
||||||
RequestDetails theRequest,
|
RequestDetails theRequest,
|
||||||
TransactionDetails theTransactionDetails);
|
TransactionDetails theTransactionDetails);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns ReindexJobStatus information object that tells the caller
|
||||||
|
* if a reindex job is still in progress or not.
|
||||||
|
*
|
||||||
|
* If the implementing DAO requires additional work during reindexing,
|
||||||
|
* this is the method to override.
|
||||||
|
*/
|
||||||
|
default ReindexJobStatus getReindexJobStatus() {
|
||||||
|
return ReindexJobStatus.NO_WORK_NEEDED;
|
||||||
|
}
|
||||||
|
|
||||||
void removeTag(
|
void removeTag(
|
||||||
IIdType theId, TagTypeEnum theTagType, String theSystem, String theCode, RequestDetails theRequestDetails);
|
IIdType theId, TagTypeEnum theTagType, String theSystem, String theCode, RequestDetails theRequestDetails);
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,11 @@ public class ReindexOutcome {
|
||||||
|
|
||||||
private List<String> myWarnings;
|
private List<String> myWarnings;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* True if there is additional (async) work to wait on.
|
||||||
|
*/
|
||||||
|
private boolean myHasPendingWork;
|
||||||
|
|
||||||
public List<String> getWarnings() {
|
public List<String> getWarnings() {
|
||||||
return defaultIfNull(myWarnings, Collections.emptyList());
|
return defaultIfNull(myWarnings, Collections.emptyList());
|
||||||
}
|
}
|
||||||
|
@ -39,4 +44,12 @@ public class ReindexOutcome {
|
||||||
}
|
}
|
||||||
myWarnings.add(theWarning);
|
myWarnings.add(theWarning);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isHasPendingWork() {
|
||||||
|
return myHasPendingWork;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHasPendingWork(boolean theHasPendingWork) {
|
||||||
|
myHasPendingWork = theHasPendingWork;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
package ca.uhn.fhir.jpa.api.model;
|
||||||
|
|
||||||
|
public class ReindexJobStatus {
|
||||||
|
|
||||||
|
public static ReindexJobStatus NO_WORK_NEEDED = new ReindexJobStatus();
|
||||||
|
|
||||||
|
private boolean myHasReindexWorkPending;
|
||||||
|
|
||||||
|
public boolean isHasReindexWorkPending() {
|
||||||
|
return myHasReindexWorkPending;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHasReindexWorkPending(boolean theHasReindexWorkPending) {
|
||||||
|
myHasReindexWorkPending = theHasReindexWorkPending;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue