adding changelog and cleaing somethings up

This commit is contained in:
leif stawnyczy 2024-09-26 16:46:42 -04:00
parent bddcea8db6
commit d677d83cce
13 changed files with 161 additions and 121 deletions

View File

@ -0,0 +1,17 @@
---
type: fix
issue: 6285
title: "Updated the Reindex Batch2 job to allow
for an additional step that will check to ensure
that no pending 'reindex' work is needed.
This was done to prevent a bug in which
value set expansion would not return all
the existing CodeSystem Concepts after
a reindex call, due to some of the concepts
being deferred to future job runs.
As such, `$reindex` operations on CodeSystems
will no longer result in incorrect value set
expansion when such an expansion is called
'too soon' after a $reindex operation.
"

View File

@ -195,10 +195,10 @@ public class JpaResourceDaoCodeSystem<T extends IBaseResource> extends BaseHapiF
@SuppressWarnings("rawtypes")
@Override
public ReindexOutcome reindex(
IResourcePersistentId thePid,
ReindexParameters theReindexParameters,
RequestDetails theRequest,
TransactionDetails theTransactionDetails) {
IResourcePersistentId thePid,
ReindexParameters theReindexParameters,
RequestDetails theRequest,
TransactionDetails theTransactionDetails) {
ReindexOutcome outcome = super.reindex(thePid, theReindexParameters, theRequest, theTransactionDetails);
if (outcome.getWarnings().isEmpty()) {

View File

@ -593,7 +593,7 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
if (theStatisticsTracker.getUpdatedConceptCount() <= myStorageSettings.getDeferIndexingForCodesystemsOfSize()) {
saveConcept(conceptToAdd);
Long nextConceptPid = conceptToAdd.getId();
Validate.notNull(nextConceptPid);
Objects.requireNonNull(nextConceptPid);
} else {
myDeferredStorageSvc.addConceptToStorageQueue(conceptToAdd);
}

View File

@ -22,8 +22,6 @@ package ca.uhn.fhir.jpa.term;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao;
@ -80,6 +78,8 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHas
private static final long SAVE_ALL_DEFERRED_WARN_MINUTES = 1;
private static final long SAVE_ALL_DEFERRED_ERROR_MINUTES = 5;
private boolean myAllowDeferredTasksTimeout = true;
private static final List<String> BATCH_JOBS_TO_CARE_ABOUT =
List.of(TERM_CODE_SYSTEM_DELETE_JOB_NAME, TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME);
private final List<TermCodeSystem> myDeferredCodeSystemsDeletions = Collections.synchronizedList(new ArrayList<>());
private final Queue<TermCodeSystemVersion> myDeferredCodeSystemVersionsDeletions = new ConcurrentLinkedQueue<>();
private final List<TermConcept> myDeferredConcepts = Collections.synchronizedList(new ArrayList<>());
@ -449,18 +449,18 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHas
* This is mostly a fail-safe
* because "cancelled" jobs are never removed.
*/
List<String> executions = new ArrayList<>(myJobExecutions);
List<String> idsToDelete = new ArrayList<>();
for (String id : executions) {
// TODO - might want to consider a "fetch all instances"
// JobInstanceFetchRequest fetchRequest = new JobInstanceFetchRequest();
// myJobCoordinator.fetchAllJobInstances()
JobInstance instance = myJobCoordinator.getInstance(id);
if (StatusEnum.getEndedStatuses().contains(instance.getStatus())) {
for (String jobId : BATCH_JOBS_TO_CARE_ABOUT) {
List<JobInstance> jobInstanceInEndedState = myJobCoordinator.getInstancesbyJobDefinitionIdAndEndedStatus(
jobId,
true, // ended = true (COMPLETED, FAILED, CANCELLED jobs only)
Math.max(myJobExecutions.size(), 1), // at most this many
0);
for (JobInstance instance : jobInstanceInEndedState) {
idsToDelete.add(instance.getInstanceId());
}
}
for (String id : idsToDelete) {
myJobExecutions.remove(id);
}

View File

@ -61,8 +61,10 @@ import org.springframework.transaction.support.TransactionTemplate;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@ -2100,10 +2102,23 @@ public class ValueSetExpansionR4Test extends BaseTermR4Test implements IValueSet
int deferredIndexingDefault = myStorageSettings.getDeferIndexingForCodesystemsOfSize();
try {
// deferred count must be less than the number of concepts on the
// CodeSystem we will upload
/**
* The deferred count must be less than the number of
* concepts we are going to be uploading.
* That way, when we do the reindex, it will defer
* the additional code systems for a later job run.
*
* See {@link TermCodeSystemStorageSvcImpl#addConceptInHierarchy(TermCodeSystemVersion, Collection, TermConcept, UploadStatistics, Map, int)}
*
* Our CodeSystem below only has 6 Concepts to add.
* So we'll set the deferred count to 3 (so 3 will be deferred)
*/
myStorageSettings.setDeferIndexingForCodesystemsOfSize(3);
ReindexUtils.setRetryDelay(Duration.of(500, ChronoUnit.MILLIS));
/*
* We're also setting our retry delay to a short timeframe
* so this test doesn't run too long.
*/
ReindexUtils.setRetryDelay(Duration.of(300, ChronoUnit.MILLIS));
IParser parser = myFhirContext.newJsonParser();
@ -2229,7 +2244,7 @@ public class ValueSetExpansionR4Test extends BaseTermR4Test implements IValueSet
/*
* If the reindex was performed correctly, the expanded ValueSet
* should contain all the CodeSystems that we originally
* should contain all the CodeSystem concepts that we originally
* uploaded (and nothing else).
*/
HashSet<String> all = new HashSet<>();

View File

@ -25,11 +25,10 @@ public class BaseReindexStep {
protected final IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
public BaseReindexStep(
HapiTransactionService theHapiTransactionService,
IFhirSystemDao<?, ?> theSystemDao,
DaoRegistry theRegistry,
IIdHelperService<IResourcePersistentId<?>> theIdHelperService
) {
HapiTransactionService theHapiTransactionService,
IFhirSystemDao<?, ?> theSystemDao,
DaoRegistry theRegistry,
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
myHapiTransactionService = theHapiTransactionService;
mySystemDao = theSystemDao;
myDaoRegistry = theRegistry;
@ -37,11 +36,11 @@ public class BaseReindexStep {
}
public ReindexResults doReindex(
ResourceIdListWorkChunkJson data,
IJobDataSink<?> theDataSink,
String theInstanceId,
String theChunkId,
ReindexJobParameters theJobParameters) {
ResourceIdListWorkChunkJson data,
IJobDataSink<?> theDataSink,
String theInstanceId,
String theChunkId,
ReindexJobParameters theJobParameters) {
RequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRetry(true);
requestDetails.setMaxRetries(REINDEX_MAX_RETRIES);
@ -49,21 +48,19 @@ public class BaseReindexStep {
TransactionDetails transactionDetails = new TransactionDetails();
ReindexTask.JobParameters jp = new ReindexTask.JobParameters();
jp.setData(data)
.setRequestDetails(requestDetails)
.setTransactionDetails(transactionDetails)
.setDataSink(theDataSink)
.setInstanceId(theInstanceId)
.setChunkId(theChunkId)
.setJobParameters(theJobParameters);
.setRequestDetails(requestDetails)
.setTransactionDetails(transactionDetails)
.setDataSink(theDataSink)
.setInstanceId(theInstanceId)
.setChunkId(theChunkId)
.setJobParameters(theJobParameters);
ReindexTask reindexJob = new ReindexTask(
jp, myDaoRegistry, mySystemDao, myIdHelperService
);
ReindexTask reindexJob = new ReindexTask(jp, myDaoRegistry, mySystemDao, myIdHelperService);
return myHapiTransactionService
.withRequest(requestDetails)
.withTransactionDetails(transactionDetails)
.withRequestPartitionId(data.getRequestPartitionId())
.execute(reindexJob);
.withRequest(requestDetails)
.withTransactionDetails(transactionDetails)
.withRequestPartitionId(data.getRequestPartitionId())
.execute(reindexJob);
}
}

View File

@ -22,8 +22,9 @@ public class CheckPendingReindexWorkStep implements IJobStepWorker<ReindexJobPar
@Nonnull
@Override
public RunOutcome run(
@Nonnull StepExecutionDetails<ReindexJobParameters, ReindexResults> theStepExecutionDetails,
@Nonnull IJobDataSink<VoidModel> theDataSink) throws JobExecutionFailedException {
@Nonnull StepExecutionDetails<ReindexJobParameters, ReindexResults> theStepExecutionDetails,
@Nonnull IJobDataSink<VoidModel> theDataSink)
throws JobExecutionFailedException {
ReindexResults results = theStepExecutionDetails.getData();

View File

@ -50,10 +50,13 @@ public class ReindexAppCtx {
@Autowired
private HapiTransactionService myHapiTransactionService;
@Autowired
private IFhirSystemDao<?, ?> mySystemDao;
@Autowired
private DaoRegistry myRegistry;
@Autowired
private IIdHelperService<IResourcePersistentId<?>> myIdHelperService;
@ -77,13 +80,9 @@ public class ReindexAppCtx {
"Load IDs of resources to reindex",
ResourceIdListWorkChunkJson.class,
reindexLoadIdsStep(theBatch2DaoSvc))
.addIntermediateStep("reindex-start",
"Perform the resource reindex",
ReindexResults.class,
reindexStepV2())
.addLastStep("reindex-pending-work",
"Waits for reindex work to complete.",
pendingWorkStep())
.addIntermediateStep(
"reindex-start", "Perform the resource reindex", ReindexResults.class, reindexStepV2())
.addLastStep("reindex-pending-work", "Waits for reindex work to complete.", pendingWorkStep())
.build();
}
@ -91,26 +90,24 @@ public class ReindexAppCtx {
@Bean
public JobDefinition<ReindexJobParameters> reindexJobDefinitionV1(IBatch2DaoSvc theBatch2DaoSvc) {
return JobDefinition.newBuilder()
.setJobDefinitionId(JOB_REINDEX)
.setJobDescription("Reindex resources")
.setJobDefinitionVersion(1)
.setParametersType(ReindexJobParameters.class)
.setParametersValidator(reindexJobParametersValidator(theBatch2DaoSvc))
.gatedExecution()
.addFirstStep(
"generate-ranges",
"Generate data ranges to reindex",
ChunkRangeJson.class,
reindexGenerateRangeChunksStep())
.addIntermediateStep(
"load-ids",
"Load IDs of resources to reindex",
ResourceIdListWorkChunkJson.class,
reindexLoadIdsStep(theBatch2DaoSvc))
.addLastStep("reindex-start",
"Start the resource reindex",
reindexStepV1())
.build();
.setJobDefinitionId(JOB_REINDEX)
.setJobDescription("Reindex resources")
.setJobDefinitionVersion(1)
.setParametersType(ReindexJobParameters.class)
.setParametersValidator(reindexJobParametersValidator(theBatch2DaoSvc))
.gatedExecution()
.addFirstStep(
"generate-ranges",
"Generate data ranges to reindex",
ChunkRangeJson.class,
reindexGenerateRangeChunksStep())
.addIntermediateStep(
"load-ids",
"Load IDs of resources to reindex",
ResourceIdListWorkChunkJson.class,
reindexLoadIdsStep(theBatch2DaoSvc))
.addLastStep("reindex-start", "Start the resource reindex", reindexStepV1())
.build();
}
@Bean
@ -147,9 +144,9 @@ public class ReindexAppCtx {
@Bean
public ReindexProvider reindexProvider(
FhirContext theFhirContext,
IJobCoordinator theJobCoordinator,
IJobPartitionProvider theJobPartitionHandler) {
FhirContext theFhirContext,
IJobCoordinator theJobCoordinator,
IJobPartitionProvider theJobPartitionHandler) {
return new ReindexProvider(theFhirContext, theJobCoordinator, theJobPartitionHandler);
}

View File

@ -35,11 +35,16 @@ import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReindexStepV1 extends BaseReindexStep implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
public class ReindexStepV1 extends BaseReindexStep
implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
private static final Logger ourLog = LoggerFactory.getLogger(ReindexStepV1.class);
public ReindexStepV1(HapiTransactionService theHapiTransactionService, IFhirSystemDao<?, ?> theSystemDao, DaoRegistry theRegistry, IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
public ReindexStepV1(
HapiTransactionService theHapiTransactionService,
IFhirSystemDao<?, ?> theSystemDao,
DaoRegistry theRegistry,
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
super(theHapiTransactionService, theSystemDao, theRegistry, theIdHelperService);
}
@ -53,14 +58,13 @@ public class ReindexStepV1 extends BaseReindexStep implements IJobStepWorker<Rei
ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData();
ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters();
doReindex(
doReindex(
data,
theDataSink,
theStepExecutionDetails.getInstance().getInstanceId(),
theStepExecutionDetails.getChunkId(),
jobParameters);
return new RunOutcome(data.size());
}
}

View File

@ -19,19 +19,27 @@ import jakarta.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map;
public class ReindexStepV2 extends BaseReindexStep implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, ReindexResults> {
public class ReindexStepV2 extends BaseReindexStep
implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, ReindexResults> {
private final ReindexJobService myReindexJobService;
public ReindexStepV2(ReindexJobService theJobService,
HapiTransactionService theHapiTransactionService, IFhirSystemDao<?, ?> theSystemDao, DaoRegistry theRegistry, IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
public ReindexStepV2(
ReindexJobService theJobService,
HapiTransactionService theHapiTransactionService,
IFhirSystemDao<?, ?> theSystemDao,
DaoRegistry theRegistry,
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
super(theHapiTransactionService, theSystemDao, theRegistry, theIdHelperService);
myReindexJobService = theJobService;
}
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails, @Nonnull IJobDataSink<ReindexResults> theDataSink) throws JobExecutionFailedException {
public RunOutcome run(
@Nonnull StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails,
@Nonnull IJobDataSink<ReindexResults> theDataSink)
throws JobExecutionFailedException {
ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData();
ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters();
@ -41,19 +49,19 @@ public class ReindexStepV2 extends BaseReindexStep implements IJobStepWorker<Rei
// our reindex work here, it won't skip over that data
Map<String, Boolean> resourceTypesToCheckFlag = new HashMap<>();
data.getTypedPids().forEach(id -> {
// we don't really care about duplicates; we check by resource type
resourceTypesToCheckFlag.put(id.getResourceType(), true);
});
// we don't really care about duplicates; we check by resource type
resourceTypesToCheckFlag.put(id.getResourceType(), true);
});
if (myReindexJobService.anyResourceHasPendingReindexWork(resourceTypesToCheckFlag)) {
throw new RetryChunkLaterException(ReindexUtils.getRetryLaterDelay());
}
ReindexResults results = doReindex(
data,
theDataSink,
theStepExecutionDetails.getInstance().getInstanceId(),
theStepExecutionDetails.getChunkId(),
jobParameters);
data,
theDataSink,
theStepExecutionDetails.getInstance().getInstanceId(),
theStepExecutionDetails.getChunkId(),
jobParameters);
theDataSink.accept(results);

View File

@ -114,11 +114,10 @@ public class ReindexTask implements TransactionCallback<ReindexResults> {
private final ReindexJobParameters myJobParameters;
public ReindexTask(
JobParameters theJobParameters,
DaoRegistry theRegistry,
IFhirSystemDao<?, ?> theSystemDao,
IIdHelperService<IResourcePersistentId<?>> theIdHelperService
) {
JobParameters theJobParameters,
DaoRegistry theRegistry,
IFhirSystemDao<?, ?> theSystemDao,
IIdHelperService<IResourcePersistentId<?>> theIdHelperService) {
myDaoRegistry = theRegistry;
mySystemDao = theSystemDao;
myIdHelperService = theIdHelperService;
@ -138,28 +137,28 @@ public class ReindexTask implements TransactionCallback<ReindexResults> {
List<IResourcePersistentId<?>> persistentIds = myData.getResourcePersistentIds(myIdHelperService);
ourLog.info(
"Starting reindex work chunk with {} resources - Instance[{}] Chunk[{}]",
persistentIds.size(),
myInstanceId,
myChunkId);
"Starting reindex work chunk with {} resources - Instance[{}] Chunk[{}]",
persistentIds.size(),
myInstanceId,
myChunkId);
StopWatch sw = new StopWatch();
ReindexResults reindexResults = new ReindexResults();
// Prefetch Resources from DB
boolean reindexSearchParameters =
myJobParameters.getReindexSearchParameters() != ReindexParameters.ReindexSearchParametersEnum.NONE;
myJobParameters.getReindexSearchParameters() != ReindexParameters.ReindexSearchParametersEnum.NONE;
mySystemDao.preFetchResources(persistentIds, reindexSearchParameters);
ourLog.info(
"Prefetched {} resources in {} - Instance[{}] Chunk[{}]",
persistentIds.size(),
sw,
myInstanceId,
myChunkId);
"Prefetched {} resources in {} - Instance[{}] Chunk[{}]",
persistentIds.size(),
sw,
myInstanceId,
myChunkId);
ReindexParameters parameters = new ReindexParameters()
.setReindexSearchParameters(myJobParameters.getReindexSearchParameters())
.setOptimizeStorage(myJobParameters.getOptimizeStorage())
.setOptimisticLock(myJobParameters.getOptimisticLock());
.setReindexSearchParameters(myJobParameters.getReindexSearchParameters())
.setOptimizeStorage(myJobParameters.getOptimizeStorage())
.setOptimisticLock(myJobParameters.getOptimisticLock());
// Reindex
@ -172,16 +171,15 @@ public class ReindexTask implements TransactionCallback<ReindexResults> {
try {
ReindexOutcome outcome =
dao.reindex(resourcePersistentId, parameters, myRequestDetails, myTransactionDetails);
dao.reindex(resourcePersistentId, parameters, myRequestDetails, myTransactionDetails);
outcome.getWarnings().forEach(myDataSink::recoveredError);
reindexResults.addResourceTypeToCompletionStatus(nextResourceType,
outcome.isHasPendingWork());
reindexResults.addResourceTypeToCompletionStatus(nextResourceType, outcome.isHasPendingWork());
} catch (BaseServerResponseException | DataFormatException e) {
String resourceForcedId = myIdHelperService
.translatePidIdToForcedIdWithCache(resourcePersistentId)
.orElse(resourcePersistentId.toString());
.translatePidIdToForcedIdWithCache(resourcePersistentId)
.orElse(resourcePersistentId.toString());
String resourceId = nextResourceType + "/" + resourceForcedId;
ourLog.error("Failure during reindexing {}", resourceId, e);
myDataSink.recoveredError("Failure reindexing " + resourceId + ": " + e.getMessage());
@ -189,12 +187,12 @@ public class ReindexTask implements TransactionCallback<ReindexResults> {
}
ourLog.info(
"Finished reindexing {} resources in {} - {}/sec - Instance[{}] Chunk[{}]",
persistentIds.size(),
sw,
sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS),
myInstanceId,
myChunkId);
"Finished reindexing {} resources in {} - {}/sec - Instance[{}] Chunk[{}]",
persistentIds.size(),
sw,
sw.formatThroughput(persistentIds.size(), TimeUnit.SECONDS),
myInstanceId,
myChunkId);
return reindexResults;
}

View File

@ -21,6 +21,10 @@ public class ReindexUtils {
return RETRY_DELAY;
}
/**
* Sets the retry delay to use for reindex jobs.
* Do not use this in production code! Only test code.
*/
@VisibleForTesting
public static void setRetryDelay(Duration theDuration) {
myDelay = theDuration;

View File

@ -24,7 +24,6 @@ public class ReindexResults implements IModelJson {
}
public void addResourceTypeToCompletionStatus(String theResourceType, boolean theRequiresMoreWork) {
getResourceToHasWorkToComplete()
.put(theResourceType, theRequiresMoreWork);
getResourceToHasWorkToComplete().put(theResourceType, theRequiresMoreWork);
}
}