Reindex Batch Jobs stuck in QUEUED status after SearchParameter update (#5043)

* Reindex Batch Jobs stuck in QUEUED status after SearchParameter update - test added

* Reindex Batch Jobs stuck in QUEUED status after SearchParameter update - fix

* Reindex Batch Jobs stuck in QUEUED status after SearchParameter update - fix test

* Reindex Batch Jobs stuck in QUEUED status after SearchParameter update - send BatchJobWorkNotification after commit

* Reindex Batch Jobs stuck in QUEUED status after SearchParameter update - changelog fix

Co-authored-by: michaelabuckley <michaelabuckley@gmail.com>

---------

Co-authored-by: michaelabuckley <michaelabuckley@gmail.com>
This commit is contained in:
volodymyr-korzh 2023-06-29 12:29:37 -06:00 committed by GitHub
parent 6e0651e261
commit 93a41559b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 1 deletions

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 5041
jira: SMILE-6428
title: "Fixed an issue where reindex batch jobs failed to start and were stuck in QUEUED status after a SearchParameter update."

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.delete.job; package ca.uhn.fhir.jpa.delete.job;
import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx; import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters; import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
@ -50,6 +51,9 @@ public class ReindexJobTest extends BaseJpaR4Test {
@Autowired @Autowired
private IJobCoordinator myJobCoordinator; private IJobCoordinator myJobCoordinator;
@Autowired
private IJobPersistence myJobPersistence;
private ReindexTestHelper myReindexTestHelper; private ReindexTestHelper myReindexTestHelper;
private PatientReindexTestHelper myPatientReindexTestHelper; private PatientReindexTestHelper myPatientReindexTestHelper;
@ -442,6 +446,23 @@ public class ReindexJobTest extends BaseJpaR4Test {
assertEquals("java.lang.Error: foo message", outcome.getErrorMessage()); assertEquals("java.lang.Error: foo message", outcome.getErrorMessage());
} }
@Test
public void testReindex_withReindexingUponSearchParameterChangeEnabled_reindexJobCompleted() {
// make sure the resources auto-reindex after the search parameter update is enabled
myStorageSettings.setMarkResourcesForReindexingUponSearchParameterChange(true);
// create an Observation resource and SearchParameter for it to trigger re-indexing
myReindexTestHelper.createObservationWithCode();
myReindexTestHelper.createCodeSearchParameter();
// check that reindex job was created
List<JobInstance> jobInstances = myJobPersistence.fetchInstancesByJobDefinitionId(ReindexAppCtx.JOB_REINDEX, 10, 0);
assertEquals(1, jobInstances.size());
// check that the job is completed (not stuck in QUEUED status)
myBatch2JobHelper.awaitJobCompletion(jobInstances.get(0).getInstanceId());
}
private static Stream<Arguments> numResourcesParams(){ private static Stream<Arguments> numResourcesParams(){
return PatientReindexTestHelper.numResourcesParams(); return PatientReindexTestHelper.numResourcesParams();
} }

View File

@ -43,6 +43,8 @@ import org.apache.commons.lang3.Validate;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.springframework.data.domain.Page; import org.springframework.data.domain.Page;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -129,13 +131,37 @@ public class JobCoordinatorImpl implements IJobCoordinator {
myJobPersistence.onCreateWithFirstChunk(jobDefinition, theStartRequest.getParameters())); myJobPersistence.onCreateWithFirstChunk(jobDefinition, theStartRequest.getParameters()));
JobWorkNotification workNotification = JobWorkNotification.firstStepNotification(jobDefinition, instanceAndFirstChunk.jobInstanceId, instanceAndFirstChunk.workChunkId); JobWorkNotification workNotification = JobWorkNotification.firstStepNotification(jobDefinition, instanceAndFirstChunk.jobInstanceId, instanceAndFirstChunk.workChunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification); sendBatchJobWorkNotificationAfterCommit(workNotification);
Batch2JobStartResponse response = new Batch2JobStartResponse(); Batch2JobStartResponse response = new Batch2JobStartResponse();
response.setInstanceId(instanceAndFirstChunk.jobInstanceId); response.setInstanceId(instanceAndFirstChunk.jobInstanceId);
return response; return response;
} }
/**
* In order to make sure that the data is actually in the DB when JobWorkNotification is handled,
* this method registers a transaction synchronization that sends JobWorkNotification to Job WorkChannel
* if and when the current database transaction is successfully committed.
* If the transaction is rolled back, the JobWorkNotification will not be sent to the job WorkChannel.
*/
private void sendBatchJobWorkNotificationAfterCommit(final JobWorkNotification theJobWorkNotification) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public int getOrder() {
return 0;
}
@Override
public void afterCommit() {
myBatchJobSender.sendWorkChannelMessage(theJobWorkNotification);
}
});
} else {
myBatchJobSender.sendWorkChannelMessage(theJobWorkNotification);
}
}
/** /**
* Cache will be used if an identical job is QUEUED or IN_PROGRESS. Otherwise a new one will kickoff. * Cache will be used if an identical job is QUEUED or IN_PROGRESS. Otherwise a new one will kickoff.
*/ */