From af1d46fb04b2b0aac5dd9d2ab0431ff6a59751a5 Mon Sep 17 00:00:00 2001 From: TipzCM Date: Fri, 20 Oct 2023 09:56:32 -0400 Subject: [PATCH] adding additional logging to resource modified submitter svc (#5370) * adding additional logging to resource modified submitter svc * s[otlestt * review fixes * spotless * fixing test --------- Co-authored-by: leif stawnyczy --- .../svc/ResourceModifiedSubmitterSvc.java | 31 +++-- .../svc/ResourceModifiedSubmitterSvcTest.java | 112 +++++++++++++++++- 2 files changed, 132 insertions(+), 11 deletions(-) diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/svc/ResourceModifiedSubmitterSvc.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/svc/ResourceModifiedSubmitterSvc.java index 7d768beefce..99d291959ad 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/svc/ResourceModifiedSubmitterSvc.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/svc/ResourceModifiedSubmitterSvc.java @@ -145,8 +145,8 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer, return theStatus -> { boolean processed = true; ResourceModifiedMessage resourceModifiedMessage = null; - try { + try { // delete the entry to lock the row to ensure unique processing boolean wasDeleted = deletePersistedResourceModifiedMessage( thePersistedResourceModifiedMessage.getPersistedResourceModifiedMessagePk()); @@ -159,15 +159,27 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer, resourceModifiedMessage = optionalResourceModifiedMessage.get(); submitResourceModified(resourceModifiedMessage); } - } catch (MessageDeliveryException exception) { // we encountered an issue when trying to send the message so mark the transaction for rollback + String payloadId = "[unknown]"; + String subscriptionId = "[unknown]"; + if (resourceModifiedMessage != null) { + payloadId = resourceModifiedMessage.getPayloadId(); + subscriptionId = resourceModifiedMessage.getSubscriptionId(); + } ourLog.error( "Channel submission failed for resource with id {} matching subscription with id {}. Further attempts will be performed at later time.", - resourceModifiedMessage.getPayloadId(), - resourceModifiedMessage.getSubscriptionId()); + payloadId, + subscriptionId, + exception); processed = false; theStatus.setRollbackOnly(); + } catch (Exception ex) { + // catch other errors + ourLog.error( + "Unexpected error encountered while processing resource modified message. Marking as processed to prevent further errors.", + ex); + processed = true; } return processed; @@ -179,7 +191,6 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer, ResourceModifiedMessage resourceModifiedMessage = null; try { - resourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage( thePersistedResourceModifiedMessage); @@ -193,14 +204,17 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer, persistedResourceModifiedMessagePk.getResourceVersion()); ourLog.warn( - "Scheduled submission will be ignored since resource {} cannot be found", idType.asStringValue()); + "Scheduled submission will be ignored since resource {} cannot be found", + idType.asStringValue(), + e); + } catch (Exception ex) { + ourLog.error("Unknown error encountered on inflation of resources.", ex); } return Optional.ofNullable(resourceModifiedMessage); } private boolean deletePersistedResourceModifiedMessage(IPersistedResourceModifiedMessagePK theResourceModifiedPK) { - try { // delete the entry to lock the row to ensure unique processing return myResourceModifiedMessagePersistenceSvc.deleteByPK(theResourceModifiedPK); @@ -213,6 +227,9 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer, // the message // successfully before we did. + return false; + } catch (Exception ex) { + ourLog.error("Unknown exception when deleting persisted resource modified message. Returning false.", ex); return false; } } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/svc/ResourceModifiedSubmitterSvcTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/svc/ResourceModifiedSubmitterSvcTest.java index 872ec955c81..21d1f97c686 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/svc/ResourceModifiedSubmitterSvcTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/svc/ResourceModifiedSubmitterSvcTest.java @@ -1,6 +1,7 @@ package ca.uhn.fhir.jpa.subscription.svc; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; +import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK; import ca.uhn.fhir.jpa.model.entity.ResourceModifiedEntity; import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; @@ -9,7 +10,12 @@ import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFact import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc; import ca.uhn.fhir.jpa.svc.MockHapiTransactionService; +import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -19,15 +25,24 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.LoggerFactory; import org.springframework.messaging.MessageDeliveryException; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.SimpleTransactionStatus; +import java.util.List; +import java.util.Optional; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -35,6 +50,8 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class ResourceModifiedSubmitterSvcTest { + private final ch.qos.logback.classic.Logger ourLogger = (Logger) LoggerFactory.getLogger(ResourceModifiedSubmitterSvc.class); + @Mock StorageSettings myStorageSettings; @Mock @@ -46,6 +63,9 @@ public class ResourceModifiedSubmitterSvcTest { @Mock IChannelProducer myChannelProducer; + @Mock + ListAppender myListAppender; + ResourceModifiedSubmitterSvc myResourceModifiedSubmitterSvc; TransactionStatus myCapturingTransactionStatus; @@ -81,7 +101,7 @@ public class ResourceModifiedSubmitterSvcTest { } @Test - public void testSubmitPersisedResourceModifiedMessage_withExistingPersistedResourceModifiedMessage_willSucceed(){ + public void testSubmitPersistedResourceModifiedMessage_withExistingPersistedResourceModifiedMessage_willSucceed(){ // given // a successful deletion implies that the message did exist. when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(true); @@ -94,11 +114,95 @@ public class ResourceModifiedSubmitterSvcTest { assertThat(wasProcessed, is(Boolean.TRUE)); assertThat(myCapturingTransactionStatus.isRollbackOnly(), is(Boolean.FALSE)); verify(myChannelProducer, times(1)).send(any()); - } @Test - public void testSubmitPersisedResourceModifiedMessage_whenMessageWasAlreadyProcess_willSucceed(){ + public void testSubmitPersistedResource_logsDeleteAndInflationExceptions() { + // setup + String deleteExMsg = "Delete Exception"; + String inflationExMsg = "Inflation Exception"; + String patientId = "/Patient/123/_history/1"; + ResourceModifiedEntity resourceModified = new ResourceModifiedEntity(); + PersistedResourceModifiedMessageEntityPK rpm = new PersistedResourceModifiedMessageEntityPK(); + rpm.setResourcePid(patientId); + rpm.setResourceVersion("1"); + resourceModified.setResourceModifiedEntityPK(rpm); + + ourLogger.addAppender(myListAppender); + ourLogger.setLevel(Level.ERROR); + + // when + when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())) + .thenThrow(new RuntimeException(deleteExMsg)); + when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())) + .thenThrow(new RuntimeException(inflationExMsg)); + + // test + boolean processed = myResourceModifiedSubmitterSvc.submitPersisedResourceModifiedMessage(resourceModified); + + // verify + assertTrue(processed); + + ArgumentCaptor logEvent = ArgumentCaptor.forClass(ILoggingEvent.class); + verify(myListAppender, atLeast(2)) + .doAppend(logEvent.capture()); + + List logs = logEvent.getAllValues(); + boolean hasDeleteException = false; + boolean hasInflationException = false; + for (ILoggingEvent log : logs) { + if (log.getThrowableProxy().getMessage().contains(deleteExMsg)) { + hasDeleteException = true; + } + if (log.getThrowableProxy().getMessage().contains(inflationExMsg)) { + hasInflationException = true; + } + } + assertTrue(hasDeleteException); + assertTrue(hasInflationException); + } + + @Test + public void testSubmitPersistedResource_withMissingResource_processes() { + // setup + String patientId = "Patient/123"; + String exceptionString = "A random exception"; + ResourceModifiedEntity resourceModified = new ResourceModifiedEntity(); + PersistedResourceModifiedMessageEntityPK rpm = new PersistedResourceModifiedMessageEntityPK(); + rpm.setResourcePid(patientId); + rpm.setResourceVersion("1"); + resourceModified.setResourceModifiedEntityPK(rpm); + ResourceModifiedMessage msg = new ResourceModifiedMessage(); + + ourLogger.addAppender(myListAppender); + ourLogger.setLevel(Level.ERROR); + + // when + when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())) + .thenReturn(true); + when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())) + .thenReturn(msg); + when(myChannelProducer.send(any())) + .thenThrow(new RuntimeException(exceptionString)); + + // test + boolean processed = myResourceModifiedSubmitterSvc.submitPersisedResourceModifiedMessage(resourceModified); + + // then + assertTrue(processed); + + // verify + verify(myChannelProducer) + .send(any()); + ArgumentCaptor loggingCaptor = ArgumentCaptor.forClass(ILoggingEvent.class); + verify(myListAppender).doAppend(loggingCaptor.capture()); + ILoggingEvent event = loggingCaptor.getValue(); + assertNotNull(event); + assertTrue(event.getThrowableProxy().getMessage().contains(exceptionString)); + } + + @Test + public void testSubmitPersistedResourceModifiedMessage_whenMessageWasAlreadyProcess_willSucceed(){ // given // deletion fails, someone else was faster and processed the message when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(false); @@ -116,7 +220,7 @@ public class ResourceModifiedSubmitterSvcTest { } @Test - public void testSubmitPersisedResourceModifiedMessage_whitErrorOnSending_willRollbackDeletion(){ + public void testSubmitPersistedResourceModifiedMessage_whitErrorOnSending_willRollbackDeletion(){ // given when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(true); when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())).thenReturn(new ResourceModifiedMessage());