diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_6_0/6210-add-chunk-id-to-delete-expunge-log-msg.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_6_0/6210-add-chunk-id-to-delete-expunge-log-msg.yaml new file mode 100644 index 00000000000..e526840512a --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_6_0/6210-add-chunk-id-to-delete-expunge-log-msg.yaml @@ -0,0 +1,6 @@ +--- +type: add +issue: 6210 +jira: SMILE-8428 +title: "Batch instance ID and chunk ID have been added to the logging context so that they can be automatically added to +batch-related messages in the log." diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/introduction.md b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/introduction.md index 1c3bb485c21..03468909d7f 100644 --- a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/introduction.md +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/introduction.md @@ -70,3 +70,7 @@ If a Job Definition is set to having Gated Execution, then all work chunks for a ### Job Instance Completion A Batch Job Maintenance Service runs every minute to monitor the status of all Job Instances and the Job Instance is transitioned to either `COMPLETED`, `ERRORED` or `FAILED` according to the status of all outstanding work chunks for that job instance. If the job instance is still `IN_PROGRESS` this maintenance service also estimates the time remaining to complete the job. + +## Logging + +The job instance ID and work chunk ID are both available through the logback MDC and can be accessed using the `%X` specifier in a `logback.xml` file. See [Logging](/docs/appendix/logging.html#logging) for more details about logging in HAPI. diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java index 7edff129858..b6410888e20 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java @@ -32,6 +32,7 @@ import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.util.Logs; import jakarta.annotation.Nonnull; import org.slf4j.Logger; +import org.slf4j.MDC; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; @@ -218,43 +219,49 @@ class WorkChannelMessageHandler implements MessageHandler { } private void handleWorkChannelMessage(JobWorkNotificationJsonMessage theMessage) { - JobWorkNotification workNotification = theMessage.getPayload(); - ourLog.info("Received work notification for {}", workNotification); + try { + JobWorkNotification workNotification = theMessage.getPayload(); + // Load the job instance and work chunk IDs into the logging MDC context + BatchJobTracingContext.setBatchJobIds(workNotification.getInstanceId(), workNotification.getChunkId()); + ourLog.info("Received work notification for {}", workNotification); - // There are three paths through this code: - // 1. Normal execution. We validate, load, update statuses, all in a tx. Then we process the chunk. - // 2. Discard chunk. If some validation fails (e.g. no chunk with that id), we log and discard the chunk. - // Probably a db rollback, with a stale queue. - // 3. Fail and retry. If we throw an exception out of here, Spring will put the queue message back, and - // redeliver later. - // - // We use Optional chaining here to simplify all the cases where we short-circuit exit. - // A step that returns an empty Optional means discard the chunk. - // - Optional processingPreparation = executeInTxRollbackWhenEmpty(() -> + // There are three paths through this code: + // 1. Normal execution. We validate, load, update statuses, all in a tx. Then we process the chunk. + // 2. Discard chunk. If some validation fails (e.g. no chunk with that id), we log and discard the chunk. + // Probably a db rollback, with a stale queue. + // 3. Fail and retry. If we throw an exception out of here, Spring will put the queue message back, and + // redeliver later. + // + // We use Optional chaining here to simplify all the cases where we short-circuit exit. + // A step that returns an empty Optional means discard the chunk. + // + Optional processingPreparation = executeInTxRollbackWhenEmpty(() -> - // Use a chain of Optional flatMap to handle all the setup short-circuit exits cleanly. - Optional.of(new MessageProcess(workNotification)) - // validate and load info - .flatMap(MessageProcess::validateChunkId) - // no job definition should be retried - we must be a stale process encountering a new - // job definition. - .flatMap(MessageProcess::loadJobDefinitionOrThrow) - .flatMap(MessageProcess::loadJobInstance) - // update statuses now in the db: QUEUED->IN_PROGRESS - .flatMap(MessageProcess::updateChunkStatusAndValidate) - .flatMap(MessageProcess::updateAndValidateJobStatus) - // ready to execute - .flatMap(MessageProcess::buildCursor) - .flatMap(MessageProcess::buildStepExecutor)); + // Use a chain of Optional flatMap to handle all the setup short-circuit exits cleanly. + Optional.of(new MessageProcess(workNotification)) + // validate and load info + .flatMap(MessageProcess::validateChunkId) + // no job definition should be retried - we must be a stale process encountering a new + // job definition. + .flatMap(MessageProcess::loadJobDefinitionOrThrow) + .flatMap(MessageProcess::loadJobInstance) + // update statuses now in the db: QUEUED->IN_PROGRESS + .flatMap(MessageProcess::updateChunkStatusAndValidate) + .flatMap(MessageProcess::updateAndValidateJobStatus) + // ready to execute + .flatMap(MessageProcess::buildCursor) + .flatMap(MessageProcess::buildStepExecutor)); - processingPreparation.ifPresentOrElse( - // all the setup is happy and committed. Do the work. - process -> process.myStepExector.executeStep(), - () -> { - // discard the chunk - ourLog.debug("Discarding chunk notification {}", workNotification); - }); + processingPreparation.ifPresentOrElse( + // all the setup is happy and committed. Do the work. + process -> process.myStepExector.executeStep(), + () -> { + // discard the chunk + ourLog.debug("Discarding chunk notification {}", workNotification); + }); + } finally { + BatchJobTracingContext.clearBatchJobsIds(); + } } /** @@ -279,4 +286,22 @@ class WorkChannelMessageHandler implements MessageHandler { return setupProcessing; }); } + + /** + * Simple wrapper around the slf4j MDC threadlocal log context. + */ + public static class BatchJobTracingContext { + static final String INSTANCE_ID = "instanceId"; + static final String CHUNK_ID = "chunkId"; + + public static void setBatchJobIds(String theInstanceId, String theChunkId) { + MDC.put(INSTANCE_ID, theInstanceId); + MDC.put(CHUNK_ID, theChunkId); + } + + public static void clearBatchJobsIds() { + MDC.remove(INSTANCE_ID); + MDC.remove(CHUNK_ID); + } + } } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandlerTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandlerTest.java new file mode 100644 index 00000000000..1b2c00cd42e --- /dev/null +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandlerTest.java @@ -0,0 +1,87 @@ +package ca.uhn.fhir.batch2.coordinator; + +import ca.uhn.fhir.batch2.api.IJobMaintenanceService; +import ca.uhn.fhir.batch2.api.IJobPersistence; +import ca.uhn.fhir.batch2.channel.BatchJobSender; + +import static ca.uhn.fhir.batch2.coordinator.WorkChannelMessageHandler.*; + +import ca.uhn.fhir.batch2.model.JobWorkNotification; +import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; +import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; +import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService; +import ca.uhn.fhir.util.Logs; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.Appender; +import jakarta.annotation.Nonnull; + +import java.util.Collection; + +import java.util.Map; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; +import ch.qos.logback.classic.Logger; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; + +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.verify; + +class WorkChannelMessageHandlerTest extends BaseBatch2Test { + @Mock + private BatchJobSender myBatchJobSender; + @Mock + private IJobPersistence myJobInstancePersister; + @Mock + private JobDefinitionRegistry myJobDefinitionRegistry; + @Mock + private IJobMaintenanceService myJobMaintenanceService; + private final IHapiTransactionService myTransactionService = new NonTransactionalHapiTransactionService(); + private WorkChunkProcessor jobStepExecutorSvc; + + @Mock + private Appender myAppender; + @Captor + private ArgumentCaptor myLoggingEvent; + + @BeforeEach + public void beforeEach() { + jobStepExecutorSvc = new WorkChunkProcessor(myJobInstancePersister, myBatchJobSender, new NonTransactionalHapiTransactionService()); + } + + @Test + public void testWorkChannelMessageHandlerLogging_containsJobAndBatchIdInLoggingContext(){ + // Setup + ((Logger) Logs.getBatchTroubleshootingLog()).addAppender(myAppender); + + // When + WorkChannelMessageHandler handler = new WorkChannelMessageHandler(myJobInstancePersister, myJobDefinitionRegistry, myBatchJobSender, jobStepExecutorSvc, myJobMaintenanceService, myTransactionService); + handler.handleMessage(new JobWorkNotificationJsonMessage(createWorkNotification(STEP_1))); + + // Then + verify(myAppender, atLeastOnce()).doAppend(myLoggingEvent.capture()); + myLoggingEvent.getAllValues() + .forEach(event -> { + Map mdcPropertyMap = event.getMDCPropertyMap(); + assertThat(mdcPropertyMap).containsEntry(BatchJobTracingContext.CHUNK_ID, CHUNK_ID); + assertThat(mdcPropertyMap).containsEntry(BatchJobTracingContext.INSTANCE_ID, INSTANCE_ID); + }); + } + + @Nonnull + private JobWorkNotification createWorkNotification(String theStepId) { + JobWorkNotification payload = new JobWorkNotification(); + payload.setJobDefinitionId(JOB_DEFINITION_ID); + payload.setJobDefinitionVersion(1); + payload.setInstanceId(INSTANCE_ID); + payload.setChunkId(BaseBatch2Test.CHUNK_ID); + payload.setTargetStepId(theStepId); + return payload; + } +}