Compare commits

...

8 Commits

Author SHA1 Message Date
jdar8 0fcd08cc1d
Merge e148c76a09 into 3f6d1eb29b 2024-09-26 02:08:11 +00:00
Thomas Papke 3f6d1eb29b
#5768 Upgrade to latest simple-java-mail (#6261) 2024-09-26 02:07:27 +00:00
jdar e148c76a09 Merge branch 'master' into jd-20240905-add-batch-and-chunk-ids-to-logging-context 2024-09-25 16:20:32 -07:00
jdar 9f4630db06 address code review comments 2024-09-25 16:20:08 -07:00
Tadgh 377e44b6ca
attribution and pom change (#6309) 2024-09-25 20:38:22 +00:00
jdar f2150dd947 spotless 2024-09-06 11:55:17 -07:00
jdar c91bee21e9 changelogs 2024-09-05 17:09:42 -07:00
jdar ebb256c5d2 add batch and chunk id to logging mdc context, test 2024-09-05 17:02:18 -07:00
14 changed files with 219 additions and 89 deletions

View File

@ -0,0 +1,6 @@
---
type: add
issue: 6210
jira: SMILE-8428
title: "The batch instance and chunk IDs have been added to the logging MDC so that the IDs can be automatically added to
batch-related messages in the log."

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 6290
title: "Previously, a specific migration task was using the `TRIM()` function, which does not exist in MSSQL 2012. This was causing migrations targeting MSSQL 2012 to fail.
This has been corrected and replaced with usage of a combination of LTRIM() and RTRIM(). Thanks to Primož Delopst at Better for the contribution!"

View File

@ -70,3 +70,7 @@ If a Job Definition is set to having Gated Execution, then all work chunks for a
### Job Instance Completion
A Batch Job Maintenance Service runs every minute to monitor the status of all Job Instances and the Job Instance is transitioned to either `COMPLETED`, `ERRORED` or `FAILED` according to the status of all outstanding work chunks for that job instance. If the job instance is still `IN_PROGRESS` this maintenance service also estimates the time remaining to complete the job.
## Logging
The job instance ID and work chunk ID are both available through the logback MDC and can be accessed using the `%X` specifier in a `logback.xml` file. See [Logging](/docs/appendix/logging.html#logging) for more details about logging in HAPI.

View File

@ -70,6 +70,11 @@
<artifactId>jakarta.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jakarta.mail</groupId>
<artifactId>jakarta.mail-api</artifactId>
<optional>true</optional>
</dependency>
<!-- test dependencies -->
<dependency>

View File

@ -28,8 +28,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import jakarta.mail.internet.InternetAddress;
import jakarta.mail.internet.MimeMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

View File

@ -17,8 +17,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import jakarta.mail.internet.InternetAddress;
import jakarta.mail.internet.MimeMessage;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;

View File

@ -26,8 +26,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.beans.factory.annotation.Autowired;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import jakarta.mail.internet.InternetAddress;
import jakarta.mail.internet.MimeMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

View File

@ -79,25 +79,11 @@
<dependency>
<groupId>org.simplejavamail</groupId>
<artifactId>simple-java-mail</artifactId>
<!-- Excluded in favor of jakarta.activation:jakarta.activation-api -->
<exclusions>
<exclusion>
<groupId>com.sun.activation</groupId>
<artifactId>jakarta.activation</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.icegreen</groupId>
<artifactId>greenmail</artifactId>
<scope>compile</scope>
<!-- Excluded in favor of jakarta.activation:jakarta.activation-api -->
<exclusions>
<exclusion>
<groupId>com.sun.activation</groupId>
<artifactId>jakarta.activation</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

View File

@ -21,9 +21,9 @@ package ca.uhn.fhir.rest.server.mail;
import jakarta.annotation.Nonnull;
import org.simplejavamail.api.email.Email;
import org.simplejavamail.api.mailer.AsyncResponse;
import java.util.List;
import java.util.function.Consumer;
public interface IMailSvc {
void sendMail(@Nonnull List<Email> theEmails);
@ -31,7 +31,5 @@ public interface IMailSvc {
void sendMail(@Nonnull Email theEmail);
void sendMail(
@Nonnull Email theEmail,
@Nonnull Runnable theOnSuccess,
@Nonnull AsyncResponse.ExceptionConsumer theErrorHandler);
@Nonnull Email theEmail, @Nonnull Runnable theOnSuccess, @Nonnull Consumer<Throwable> theErrorHandler);
}

View File

@ -20,12 +20,9 @@
package ca.uhn.fhir.rest.server.mail;
import jakarta.annotation.Nonnull;
import org.apache.commons.lang3.Validate;
import org.simplejavamail.MailException;
import org.simplejavamail.api.email.Email;
import org.simplejavamail.api.email.Recipient;
import org.simplejavamail.api.mailer.AsyncResponse;
import org.simplejavamail.api.mailer.AsyncResponse.ExceptionConsumer;
import org.simplejavamail.api.mailer.Mailer;
import org.simplejavamail.api.mailer.config.TransportStrategy;
import org.simplejavamail.mailer.MailerBuilder;
@ -33,6 +30,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class MailSvc implements IMailSvc {
@ -42,14 +41,14 @@ public class MailSvc implements IMailSvc {
private final Mailer myMailer;
public MailSvc(@Nonnull MailConfig theMailConfig) {
Validate.notNull(theMailConfig);
Objects.requireNonNull(theMailConfig);
myMailConfig = theMailConfig;
myMailer = makeMailer(myMailConfig);
}
@Override
public void sendMail(@Nonnull List<Email> theEmails) {
Validate.notNull(theEmails);
Objects.requireNonNull(theEmails);
theEmails.forEach(theEmail -> send(theEmail, new OnSuccess(theEmail), new ErrorHandler(theEmail)));
}
@ -60,21 +59,23 @@ public class MailSvc implements IMailSvc {
@Override
public void sendMail(
@Nonnull Email theEmail, @Nonnull Runnable theOnSuccess, @Nonnull ExceptionConsumer theErrorHandler) {
@Nonnull Email theEmail, @Nonnull Runnable theOnSuccess, @Nonnull Consumer<Throwable> theErrorHandler) {
send(theEmail, theOnSuccess, theErrorHandler);
}
private void send(
@Nonnull Email theEmail, @Nonnull Runnable theOnSuccess, @Nonnull ExceptionConsumer theErrorHandler) {
Validate.notNull(theEmail);
Validate.notNull(theOnSuccess);
Validate.notNull(theErrorHandler);
@Nonnull Email theEmail, @Nonnull Runnable theOnSuccess, @Nonnull Consumer<Throwable> theErrorHandler) {
Objects.requireNonNull(theEmail);
Objects.requireNonNull(theOnSuccess);
Objects.requireNonNull(theErrorHandler);
try {
final AsyncResponse asyncResponse = myMailer.sendMail(theEmail, true);
if (asyncResponse != null) {
asyncResponse.onSuccess(theOnSuccess);
asyncResponse.onException(theErrorHandler);
}
myMailer.sendMail(theEmail, true).whenComplete((result, ex) -> {
if (ex != null) {
theErrorHandler.accept(ex);
} else {
theOnSuccess.run();
}
});
} catch (MailException e) {
theErrorHandler.accept(e);
}
@ -117,7 +118,7 @@ public class MailSvc implements IMailSvc {
}
}
private class ErrorHandler implements ExceptionConsumer {
private class ErrorHandler implements Consumer<Throwable> {
private final Email myEmail;
private ErrorHandler(@Nonnull Email theEmail) {
@ -125,7 +126,7 @@ public class MailSvc implements IMailSvc {
}
@Override
public void accept(Exception t) {
public void accept(Throwable t) {
ourLog.error("Email not sent" + makeMessage(myEmail), t);
}
}

View File

@ -4,6 +4,7 @@ import com.icegreen.greenmail.junit5.GreenMailExtension;
import com.icegreen.greenmail.util.GreenMailUtil;
import com.icegreen.greenmail.util.ServerSetupTest;
import jakarta.annotation.Nonnull;
import jakarta.mail.internet.MimeMessage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -11,7 +12,6 @@ import org.simplejavamail.MailException;
import org.simplejavamail.api.email.Email;
import org.simplejavamail.email.EmailBuilder;
import javax.mail.internet.MimeMessage;
import java.util.Arrays;
import java.util.List;
@ -86,13 +86,14 @@ public class MailSvcIT {
@Test
public void testSendMailWithInvalidToAddressExpectErrorHandler() {
// setup
final Email email = withEmail("xyz");
String invalidEmailAdress = "xyz";
final Email email = withEmail(invalidEmailAdress);
// execute
fixture.sendMail(email,
() -> fail("Should not execute on Success"),
(e) -> {
assertTrue(e instanceof MailException);
assertEquals("Invalid TO address: " + email, e.getMessage());
assertEquals("Invalid TO address: " + invalidEmailAdress, e.getMessage());
});
// validate
assertTrue(ourGreenMail.waitForIncomingEmail(1000, 0));

View File

@ -32,6 +32,7 @@ import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.util.Logs;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.MDC;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@ -218,43 +219,49 @@ class WorkChannelMessageHandler implements MessageHandler {
}
private void handleWorkChannelMessage(JobWorkNotificationJsonMessage theMessage) {
JobWorkNotification workNotification = theMessage.getPayload();
ourLog.info("Received work notification for {}", workNotification);
try {
JobWorkNotification workNotification = theMessage.getPayload();
// Load the job instance and work chunk IDs into the logging MDC context
BatchJobTracingContext.setBatchJobIds(workNotification.getInstanceId(), workNotification.getChunkId());
ourLog.info("Received work notification for {}", workNotification);
// There are three paths through this code:
// 1. Normal execution. We validate, load, update statuses, all in a tx. Then we process the chunk.
// 2. Discard chunk. If some validation fails (e.g. no chunk with that id), we log and discard the chunk.
// Probably a db rollback, with a stale queue.
// 3. Fail and retry. If we throw an exception out of here, Spring will put the queue message back, and
// redeliver later.
//
// We use Optional chaining here to simplify all the cases where we short-circuit exit.
// A step that returns an empty Optional means discard the chunk.
//
Optional<MessageProcess> processingPreparation = executeInTxRollbackWhenEmpty(() ->
// There are three paths through this code:
// 1. Normal execution. We validate, load, update statuses, all in a tx. Then we process the chunk.
// 2. Discard chunk. If some validation fails (e.g. no chunk with that id), we log and discard the chunk.
// Probably a db rollback, with a stale queue.
// 3. Fail and retry. If we throw an exception out of here, Spring will put the queue message back, and
// redeliver later.
//
// We use Optional chaining here to simplify all the cases where we short-circuit exit.
// A step that returns an empty Optional means discard the chunk.
//
Optional<MessageProcess> processingPreparation = executeInTxRollbackWhenEmpty(() ->
// Use a chain of Optional flatMap to handle all the setup short-circuit exits cleanly.
Optional.of(new MessageProcess(workNotification))
// validate and load info
.flatMap(MessageProcess::validateChunkId)
// no job definition should be retried - we must be a stale process encountering a new
// job definition.
.flatMap(MessageProcess::loadJobDefinitionOrThrow)
.flatMap(MessageProcess::loadJobInstance)
// update statuses now in the db: QUEUED->IN_PROGRESS
.flatMap(MessageProcess::updateChunkStatusAndValidate)
.flatMap(MessageProcess::updateAndValidateJobStatus)
// ready to execute
.flatMap(MessageProcess::buildCursor)
.flatMap(MessageProcess::buildStepExecutor));
// Use a chain of Optional flatMap to handle all the setup short-circuit exits cleanly.
Optional.of(new MessageProcess(workNotification))
// validate and load info
.flatMap(MessageProcess::validateChunkId)
// no job definition should be retried - we must be a stale process encountering a new
// job definition.
.flatMap(MessageProcess::loadJobDefinitionOrThrow)
.flatMap(MessageProcess::loadJobInstance)
// update statuses now in the db: QUEUED->IN_PROGRESS
.flatMap(MessageProcess::updateChunkStatusAndValidate)
.flatMap(MessageProcess::updateAndValidateJobStatus)
// ready to execute
.flatMap(MessageProcess::buildCursor)
.flatMap(MessageProcess::buildStepExecutor));
processingPreparation.ifPresentOrElse(
// all the setup is happy and committed. Do the work.
process -> process.myStepExector.executeStep(),
() -> {
// discard the chunk
ourLog.debug("Discarding chunk notification {}", workNotification);
});
processingPreparation.ifPresentOrElse(
// all the setup is happy and committed. Do the work.
process -> process.myStepExector.executeStep(),
() -> {
// discard the chunk
ourLog.debug("Discarding chunk notification {}", workNotification);
});
} finally {
BatchJobTracingContext.clearBatchJobsIds();
}
}
/**
@ -279,4 +286,22 @@ class WorkChannelMessageHandler implements MessageHandler {
return setupProcessing;
});
}
/**
* Simple wrapper around the slf4j MDC threadlocal log context.
*/
public static class BatchJobTracingContext {
static final String INSTANCE_ID = "instanceId";
static final String CHUNK_ID = "chunkId";
public static void setBatchJobIds(String theInstanceId, String theChunkId) {
MDC.put(INSTANCE_ID, theInstanceId);
MDC.put(CHUNK_ID, theChunkId);
}
public static void clearBatchJobsIds() {
MDC.remove(INSTANCE_ID);
MDC.remove(CHUNK_ID);
}
}
}

View File

@ -0,0 +1,87 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import static ca.uhn.fhir.batch2.coordinator.WorkChannelMessageHandler.*;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService;
import ca.uhn.fhir.util.Logs;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import jakarta.annotation.Nonnull;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
import ch.qos.logback.classic.Logger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verify;
class WorkChannelMessageHandlerTest extends BaseBatch2Test {
@Mock
private BatchJobSender myBatchJobSender;
@Mock
private IJobPersistence myJobInstancePersister;
@Mock
private JobDefinitionRegistry myJobDefinitionRegistry;
@Mock
private IJobMaintenanceService myJobMaintenanceService;
private final IHapiTransactionService myTransactionService = new NonTransactionalHapiTransactionService();
private WorkChunkProcessor jobStepExecutorSvc;
@Mock
private Appender<ILoggingEvent> myAppender;
@Captor
private ArgumentCaptor<ILoggingEvent> myLoggingEvent;
@BeforeEach
public void beforeEach() {
jobStepExecutorSvc = new WorkChunkProcessor(myJobInstancePersister, myBatchJobSender, new NonTransactionalHapiTransactionService());
}
@Test
public void testWorkChannelMessageHandlerLogging_containsJobAndBatchIdInLoggingContext(){
// Setup
((Logger) Logs.getBatchTroubleshootingLog()).addAppender(myAppender);
// When
WorkChannelMessageHandler handler = new WorkChannelMessageHandler(myJobInstancePersister, myJobDefinitionRegistry, myBatchJobSender, jobStepExecutorSvc, myJobMaintenanceService, myTransactionService);
handler.handleMessage(new JobWorkNotificationJsonMessage(createWorkNotification(STEP_1)));
// Then
verify(myAppender, atLeastOnce()).doAppend(myLoggingEvent.capture());
myLoggingEvent.getAllValues()
.forEach(event -> {
Map<String, String> mdcPropertyMap = event.getMDCPropertyMap();
assertThat(mdcPropertyMap).containsEntry(BatchJobTracingContext.CHUNK_ID, CHUNK_ID);
assertThat(mdcPropertyMap).containsEntry(BatchJobTracingContext.INSTANCE_ID, INSTANCE_ID);
});
}
@Nonnull
private JobWorkNotification createWorkNotification(String theStepId) {
JobWorkNotification payload = new JobWorkNotification();
payload.setJobDefinitionId(JOB_DEFINITION_ID);
payload.setJobDefinitionVersion(1);
payload.setInstanceId(INSTANCE_ID);
payload.setChunkId(BaseBatch2Test.CHUNK_ID);
payload.setTargetStepId(theStepId);
return payload;
}
}

34
pom.xml
View File

@ -869,6 +869,7 @@
<developer>
<id>delopst</id>
<name>Primož Delopst</name>
<organization>Better</organization>
</developer>
<developer>
<id>Zach Smith</id>
@ -1160,27 +1161,38 @@
<dependency>
<groupId>org.simplejavamail</groupId>
<artifactId>simple-java-mail</artifactId>
<version>6.6.1</version>
<version>8.11.2</version>
<exclusions>
<exclusion>
<groupId>com.sun.activation</groupId>
<artifactId>jakarta.activation-api</artifactId>
<groupId>com.github.bbottema</groupId>
<artifactId>jetbrains-runtime-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.activation</groupId>
<artifactId>jakarta.activation</artifactId>
<groupId>jakarta.mail</groupId>
<artifactId>jakarta.mail-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>jakarta.mail</groupId>
<artifactId>jakarta.mail-api</artifactId>
<version>2.1.3</version>
</dependency>
<dependency>
<groupId>com.icegreen</groupId>
<artifactId>greenmail</artifactId>
<version>2.1.0-rc-1</version>
<exclusions>
<exclusion>
<groupId>jakarta.mail</groupId>
<artifactId>jakarta.mail-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.icegreen</groupId>
<artifactId>greenmail</artifactId>
<version>1.6.4</version>
</dependency>
<dependency>
<groupId>com.icegreen</groupId>
<artifactId>greenmail-junit5</artifactId>
<version>1.6.4</version>
<version>2.1.0-rc-1</version>
<scope>compile</scope>
</dependency>
<!-- mail end -->