mirror of
https://github.com/hapifhir/hapi-fhir.git
synced 2025-03-07 20:50:05 +00:00
add batch and chunk ids to logging context (#6268)
* add batch and chunk id to logging mdc context, test * changelogs * spotless * address code review comments * Update changelog from code review comment Co-authored-by: Tadgh <garygrantgraham@gmail.com> --------- Co-authored-by: jdar <justin.dar@smiledigitalhealth.com> Co-authored-by: Tadgh <garygrantgraham@gmail.com>
This commit is contained in:
parent
0daa1a0fde
commit
6a365f8722
@ -0,0 +1,6 @@
|
|||||||
|
---
|
||||||
|
type: add
|
||||||
|
issue: 6210
|
||||||
|
jira: SMILE-8428
|
||||||
|
title: "Batch instance ID and chunk ID have been added to the logging context so that they can be automatically added to
|
||||||
|
batch-related messages in the log."
|
@ -70,3 +70,7 @@ If a Job Definition is set to having Gated Execution, then all work chunks for a
|
|||||||
### Job Instance Completion
|
### Job Instance Completion
|
||||||
|
|
||||||
A Batch Job Maintenance Service runs every minute to monitor the status of all Job Instances and the Job Instance is transitioned to either `COMPLETED`, `ERRORED` or `FAILED` according to the status of all outstanding work chunks for that job instance. If the job instance is still `IN_PROGRESS` this maintenance service also estimates the time remaining to complete the job.
|
A Batch Job Maintenance Service runs every minute to monitor the status of all Job Instances and the Job Instance is transitioned to either `COMPLETED`, `ERRORED` or `FAILED` according to the status of all outstanding work chunks for that job instance. If the job instance is still `IN_PROGRESS` this maintenance service also estimates the time remaining to complete the job.
|
||||||
|
|
||||||
|
## Logging
|
||||||
|
|
||||||
|
The job instance ID and work chunk ID are both available through the logback MDC and can be accessed using the `%X` specifier in a `logback.xml` file. See [Logging](/docs/appendix/logging.html#logging) for more details about logging in HAPI.
|
||||||
|
@ -32,6 +32,7 @@ import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
|
|||||||
import ca.uhn.fhir.util.Logs;
|
import ca.uhn.fhir.util.Logs;
|
||||||
import jakarta.annotation.Nonnull;
|
import jakarta.annotation.Nonnull;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.MDC;
|
||||||
import org.springframework.messaging.Message;
|
import org.springframework.messaging.Message;
|
||||||
import org.springframework.messaging.MessageHandler;
|
import org.springframework.messaging.MessageHandler;
|
||||||
import org.springframework.messaging.MessagingException;
|
import org.springframework.messaging.MessagingException;
|
||||||
@ -218,43 +219,49 @@ class WorkChannelMessageHandler implements MessageHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void handleWorkChannelMessage(JobWorkNotificationJsonMessage theMessage) {
|
private void handleWorkChannelMessage(JobWorkNotificationJsonMessage theMessage) {
|
||||||
JobWorkNotification workNotification = theMessage.getPayload();
|
try {
|
||||||
ourLog.info("Received work notification for {}", workNotification);
|
JobWorkNotification workNotification = theMessage.getPayload();
|
||||||
|
// Load the job instance and work chunk IDs into the logging MDC context
|
||||||
|
BatchJobTracingContext.setBatchJobIds(workNotification.getInstanceId(), workNotification.getChunkId());
|
||||||
|
ourLog.info("Received work notification for {}", workNotification);
|
||||||
|
|
||||||
// There are three paths through this code:
|
// There are three paths through this code:
|
||||||
// 1. Normal execution. We validate, load, update statuses, all in a tx. Then we process the chunk.
|
// 1. Normal execution. We validate, load, update statuses, all in a tx. Then we process the chunk.
|
||||||
// 2. Discard chunk. If some validation fails (e.g. no chunk with that id), we log and discard the chunk.
|
// 2. Discard chunk. If some validation fails (e.g. no chunk with that id), we log and discard the chunk.
|
||||||
// Probably a db rollback, with a stale queue.
|
// Probably a db rollback, with a stale queue.
|
||||||
// 3. Fail and retry. If we throw an exception out of here, Spring will put the queue message back, and
|
// 3. Fail and retry. If we throw an exception out of here, Spring will put the queue message back, and
|
||||||
// redeliver later.
|
// redeliver later.
|
||||||
//
|
//
|
||||||
// We use Optional chaining here to simplify all the cases where we short-circuit exit.
|
// We use Optional chaining here to simplify all the cases where we short-circuit exit.
|
||||||
// A step that returns an empty Optional means discard the chunk.
|
// A step that returns an empty Optional means discard the chunk.
|
||||||
//
|
//
|
||||||
Optional<MessageProcess> processingPreparation = executeInTxRollbackWhenEmpty(() ->
|
Optional<MessageProcess> processingPreparation = executeInTxRollbackWhenEmpty(() ->
|
||||||
|
|
||||||
// Use a chain of Optional flatMap to handle all the setup short-circuit exits cleanly.
|
// Use a chain of Optional flatMap to handle all the setup short-circuit exits cleanly.
|
||||||
Optional.of(new MessageProcess(workNotification))
|
Optional.of(new MessageProcess(workNotification))
|
||||||
// validate and load info
|
// validate and load info
|
||||||
.flatMap(MessageProcess::validateChunkId)
|
.flatMap(MessageProcess::validateChunkId)
|
||||||
// no job definition should be retried - we must be a stale process encountering a new
|
// no job definition should be retried - we must be a stale process encountering a new
|
||||||
// job definition.
|
// job definition.
|
||||||
.flatMap(MessageProcess::loadJobDefinitionOrThrow)
|
.flatMap(MessageProcess::loadJobDefinitionOrThrow)
|
||||||
.flatMap(MessageProcess::loadJobInstance)
|
.flatMap(MessageProcess::loadJobInstance)
|
||||||
// update statuses now in the db: QUEUED->IN_PROGRESS
|
// update statuses now in the db: QUEUED->IN_PROGRESS
|
||||||
.flatMap(MessageProcess::updateChunkStatusAndValidate)
|
.flatMap(MessageProcess::updateChunkStatusAndValidate)
|
||||||
.flatMap(MessageProcess::updateAndValidateJobStatus)
|
.flatMap(MessageProcess::updateAndValidateJobStatus)
|
||||||
// ready to execute
|
// ready to execute
|
||||||
.flatMap(MessageProcess::buildCursor)
|
.flatMap(MessageProcess::buildCursor)
|
||||||
.flatMap(MessageProcess::buildStepExecutor));
|
.flatMap(MessageProcess::buildStepExecutor));
|
||||||
|
|
||||||
processingPreparation.ifPresentOrElse(
|
processingPreparation.ifPresentOrElse(
|
||||||
// all the setup is happy and committed. Do the work.
|
// all the setup is happy and committed. Do the work.
|
||||||
process -> process.myStepExector.executeStep(),
|
process -> process.myStepExector.executeStep(),
|
||||||
() -> {
|
() -> {
|
||||||
// discard the chunk
|
// discard the chunk
|
||||||
ourLog.debug("Discarding chunk notification {}", workNotification);
|
ourLog.debug("Discarding chunk notification {}", workNotification);
|
||||||
});
|
});
|
||||||
|
} finally {
|
||||||
|
BatchJobTracingContext.clearBatchJobsIds();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -279,4 +286,22 @@ class WorkChannelMessageHandler implements MessageHandler {
|
|||||||
return setupProcessing;
|
return setupProcessing;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple wrapper around the slf4j MDC threadlocal log context.
|
||||||
|
*/
|
||||||
|
public static class BatchJobTracingContext {
|
||||||
|
static final String INSTANCE_ID = "instanceId";
|
||||||
|
static final String CHUNK_ID = "chunkId";
|
||||||
|
|
||||||
|
public static void setBatchJobIds(String theInstanceId, String theChunkId) {
|
||||||
|
MDC.put(INSTANCE_ID, theInstanceId);
|
||||||
|
MDC.put(CHUNK_ID, theChunkId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void clearBatchJobsIds() {
|
||||||
|
MDC.remove(INSTANCE_ID);
|
||||||
|
MDC.remove(CHUNK_ID);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,87 @@
|
|||||||
|
package ca.uhn.fhir.batch2.coordinator;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
|
||||||
|
import ca.uhn.fhir.batch2.api.IJobPersistence;
|
||||||
|
import ca.uhn.fhir.batch2.channel.BatchJobSender;
|
||||||
|
|
||||||
|
import static ca.uhn.fhir.batch2.coordinator.WorkChannelMessageHandler.*;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.batch2.model.JobWorkNotification;
|
||||||
|
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
|
||||||
|
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
|
||||||
|
import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService;
|
||||||
|
import ca.uhn.fhir.util.Logs;
|
||||||
|
|
||||||
|
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||||
|
import ch.qos.logback.core.Appender;
|
||||||
|
import jakarta.annotation.Nonnull;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import ch.qos.logback.classic.Logger;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.Captor;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.atLeastOnce;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
class WorkChannelMessageHandlerTest extends BaseBatch2Test {
|
||||||
|
@Mock
|
||||||
|
private BatchJobSender myBatchJobSender;
|
||||||
|
@Mock
|
||||||
|
private IJobPersistence myJobInstancePersister;
|
||||||
|
@Mock
|
||||||
|
private JobDefinitionRegistry myJobDefinitionRegistry;
|
||||||
|
@Mock
|
||||||
|
private IJobMaintenanceService myJobMaintenanceService;
|
||||||
|
private final IHapiTransactionService myTransactionService = new NonTransactionalHapiTransactionService();
|
||||||
|
private WorkChunkProcessor jobStepExecutorSvc;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private Appender<ILoggingEvent> myAppender;
|
||||||
|
@Captor
|
||||||
|
private ArgumentCaptor<ILoggingEvent> myLoggingEvent;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void beforeEach() {
|
||||||
|
jobStepExecutorSvc = new WorkChunkProcessor(myJobInstancePersister, myBatchJobSender, new NonTransactionalHapiTransactionService());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWorkChannelMessageHandlerLogging_containsJobAndBatchIdInLoggingContext(){
|
||||||
|
// Setup
|
||||||
|
((Logger) Logs.getBatchTroubleshootingLog()).addAppender(myAppender);
|
||||||
|
|
||||||
|
// When
|
||||||
|
WorkChannelMessageHandler handler = new WorkChannelMessageHandler(myJobInstancePersister, myJobDefinitionRegistry, myBatchJobSender, jobStepExecutorSvc, myJobMaintenanceService, myTransactionService);
|
||||||
|
handler.handleMessage(new JobWorkNotificationJsonMessage(createWorkNotification(STEP_1)));
|
||||||
|
|
||||||
|
// Then
|
||||||
|
verify(myAppender, atLeastOnce()).doAppend(myLoggingEvent.capture());
|
||||||
|
myLoggingEvent.getAllValues()
|
||||||
|
.forEach(event -> {
|
||||||
|
Map<String, String> mdcPropertyMap = event.getMDCPropertyMap();
|
||||||
|
assertThat(mdcPropertyMap).containsEntry(BatchJobTracingContext.CHUNK_ID, CHUNK_ID);
|
||||||
|
assertThat(mdcPropertyMap).containsEntry(BatchJobTracingContext.INSTANCE_ID, INSTANCE_ID);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nonnull
|
||||||
|
private JobWorkNotification createWorkNotification(String theStepId) {
|
||||||
|
JobWorkNotification payload = new JobWorkNotification();
|
||||||
|
payload.setJobDefinitionId(JOB_DEFINITION_ID);
|
||||||
|
payload.setJobDefinitionVersion(1);
|
||||||
|
payload.setInstanceId(INSTANCE_ID);
|
||||||
|
payload.setChunkId(BaseBatch2Test.CHUNK_ID);
|
||||||
|
payload.setTargetStepId(theStepId);
|
||||||
|
return payload;
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user