adding additional logging to resource modified submitter svc (#5370)
* adding additional logging to resource modified submitter svc * s[otlestt * review fixes * spotless * fixing test --------- Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-mbp.home>
This commit is contained in:
parent
20c8109a32
commit
af1d46fb04
|
@ -145,8 +145,8 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer,
|
|||
return theStatus -> {
|
||||
boolean processed = true;
|
||||
ResourceModifiedMessage resourceModifiedMessage = null;
|
||||
try {
|
||||
|
||||
try {
|
||||
// delete the entry to lock the row to ensure unique processing
|
||||
boolean wasDeleted = deletePersistedResourceModifiedMessage(
|
||||
thePersistedResourceModifiedMessage.getPersistedResourceModifiedMessagePk());
|
||||
|
@ -159,15 +159,27 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer,
|
|||
resourceModifiedMessage = optionalResourceModifiedMessage.get();
|
||||
submitResourceModified(resourceModifiedMessage);
|
||||
}
|
||||
|
||||
} catch (MessageDeliveryException exception) {
|
||||
// we encountered an issue when trying to send the message so mark the transaction for rollback
|
||||
String payloadId = "[unknown]";
|
||||
String subscriptionId = "[unknown]";
|
||||
if (resourceModifiedMessage != null) {
|
||||
payloadId = resourceModifiedMessage.getPayloadId();
|
||||
subscriptionId = resourceModifiedMessage.getSubscriptionId();
|
||||
}
|
||||
ourLog.error(
|
||||
"Channel submission failed for resource with id {} matching subscription with id {}. Further attempts will be performed at later time.",
|
||||
resourceModifiedMessage.getPayloadId(),
|
||||
resourceModifiedMessage.getSubscriptionId());
|
||||
payloadId,
|
||||
subscriptionId,
|
||||
exception);
|
||||
processed = false;
|
||||
theStatus.setRollbackOnly();
|
||||
} catch (Exception ex) {
|
||||
// catch other errors
|
||||
ourLog.error(
|
||||
"Unexpected error encountered while processing resource modified message. Marking as processed to prevent further errors.",
|
||||
ex);
|
||||
processed = true;
|
||||
}
|
||||
|
||||
return processed;
|
||||
|
@ -179,7 +191,6 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer,
|
|||
ResourceModifiedMessage resourceModifiedMessage = null;
|
||||
|
||||
try {
|
||||
|
||||
resourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(
|
||||
thePersistedResourceModifiedMessage);
|
||||
|
||||
|
@ -193,14 +204,17 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer,
|
|||
persistedResourceModifiedMessagePk.getResourceVersion());
|
||||
|
||||
ourLog.warn(
|
||||
"Scheduled submission will be ignored since resource {} cannot be found", idType.asStringValue());
|
||||
"Scheduled submission will be ignored since resource {} cannot be found",
|
||||
idType.asStringValue(),
|
||||
e);
|
||||
} catch (Exception ex) {
|
||||
ourLog.error("Unknown error encountered on inflation of resources.", ex);
|
||||
}
|
||||
|
||||
return Optional.ofNullable(resourceModifiedMessage);
|
||||
}
|
||||
|
||||
private boolean deletePersistedResourceModifiedMessage(IPersistedResourceModifiedMessagePK theResourceModifiedPK) {
|
||||
|
||||
try {
|
||||
// delete the entry to lock the row to ensure unique processing
|
||||
return myResourceModifiedMessagePersistenceSvc.deleteByPK(theResourceModifiedPK);
|
||||
|
@ -213,6 +227,9 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer,
|
|||
// the message
|
||||
// successfully before we did.
|
||||
|
||||
return false;
|
||||
} catch (Exception ex) {
|
||||
ourLog.error("Unknown exception when deleting persisted resource modified message. Returning false.", ex);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package ca.uhn.fhir.jpa.subscription.svc;
|
||||
|
||||
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
|
||||
import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceModifiedEntity;
|
||||
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
|
||||
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
|
||||
|
@ -9,7 +10,12 @@ import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFact
|
|||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
|
||||
import ca.uhn.fhir.jpa.svc.MockHapiTransactionService;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
||||
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
|
||||
import ch.qos.logback.classic.Level;
|
||||
import ch.qos.logback.classic.Logger;
|
||||
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||
import ch.qos.logback.core.read.ListAppender;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
@ -19,15 +25,24 @@ import org.mockito.ArgumentCaptor;
|
|||
import org.mockito.Captor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.messaging.MessageDeliveryException;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.SimpleTransactionStatus;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.atLeast;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -35,6 +50,8 @@ import static org.mockito.Mockito.when;
|
|||
@ExtendWith(MockitoExtension.class)
|
||||
public class ResourceModifiedSubmitterSvcTest {
|
||||
|
||||
private final ch.qos.logback.classic.Logger ourLogger = (Logger) LoggerFactory.getLogger(ResourceModifiedSubmitterSvc.class);
|
||||
|
||||
@Mock
|
||||
StorageSettings myStorageSettings;
|
||||
@Mock
|
||||
|
@ -46,6 +63,9 @@ public class ResourceModifiedSubmitterSvcTest {
|
|||
@Mock
|
||||
IChannelProducer myChannelProducer;
|
||||
|
||||
@Mock
|
||||
ListAppender<ILoggingEvent> myListAppender;
|
||||
|
||||
ResourceModifiedSubmitterSvc myResourceModifiedSubmitterSvc;
|
||||
TransactionStatus myCapturingTransactionStatus;
|
||||
|
||||
|
@ -81,7 +101,7 @@ public class ResourceModifiedSubmitterSvcTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSubmitPersisedResourceModifiedMessage_withExistingPersistedResourceModifiedMessage_willSucceed(){
|
||||
public void testSubmitPersistedResourceModifiedMessage_withExistingPersistedResourceModifiedMessage_willSucceed(){
|
||||
// given
|
||||
// a successful deletion implies that the message did exist.
|
||||
when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(true);
|
||||
|
@ -94,11 +114,95 @@ public class ResourceModifiedSubmitterSvcTest {
|
|||
assertThat(wasProcessed, is(Boolean.TRUE));
|
||||
assertThat(myCapturingTransactionStatus.isRollbackOnly(), is(Boolean.FALSE));
|
||||
verify(myChannelProducer, times(1)).send(any());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubmitPersisedResourceModifiedMessage_whenMessageWasAlreadyProcess_willSucceed(){
|
||||
public void testSubmitPersistedResource_logsDeleteAndInflationExceptions() {
|
||||
// setup
|
||||
String deleteExMsg = "Delete Exception";
|
||||
String inflationExMsg = "Inflation Exception";
|
||||
String patientId = "/Patient/123/_history/1";
|
||||
ResourceModifiedEntity resourceModified = new ResourceModifiedEntity();
|
||||
PersistedResourceModifiedMessageEntityPK rpm = new PersistedResourceModifiedMessageEntityPK();
|
||||
rpm.setResourcePid(patientId);
|
||||
rpm.setResourceVersion("1");
|
||||
resourceModified.setResourceModifiedEntityPK(rpm);
|
||||
|
||||
ourLogger.addAppender(myListAppender);
|
||||
ourLogger.setLevel(Level.ERROR);
|
||||
|
||||
// when
|
||||
when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any()))
|
||||
.thenThrow(new RuntimeException(deleteExMsg));
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any()))
|
||||
.thenThrow(new RuntimeException(inflationExMsg));
|
||||
|
||||
// test
|
||||
boolean processed = myResourceModifiedSubmitterSvc.submitPersisedResourceModifiedMessage(resourceModified);
|
||||
|
||||
// verify
|
||||
assertTrue(processed);
|
||||
|
||||
ArgumentCaptor<ILoggingEvent> logEvent = ArgumentCaptor.forClass(ILoggingEvent.class);
|
||||
verify(myListAppender, atLeast(2))
|
||||
.doAppend(logEvent.capture());
|
||||
|
||||
List<ILoggingEvent> logs = logEvent.getAllValues();
|
||||
boolean hasDeleteException = false;
|
||||
boolean hasInflationException = false;
|
||||
for (ILoggingEvent log : logs) {
|
||||
if (log.getThrowableProxy().getMessage().contains(deleteExMsg)) {
|
||||
hasDeleteException = true;
|
||||
}
|
||||
if (log.getThrowableProxy().getMessage().contains(inflationExMsg)) {
|
||||
hasInflationException = true;
|
||||
}
|
||||
}
|
||||
assertTrue(hasDeleteException);
|
||||
assertTrue(hasInflationException);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubmitPersistedResource_withMissingResource_processes() {
|
||||
// setup
|
||||
String patientId = "Patient/123";
|
||||
String exceptionString = "A random exception";
|
||||
ResourceModifiedEntity resourceModified = new ResourceModifiedEntity();
|
||||
PersistedResourceModifiedMessageEntityPK rpm = new PersistedResourceModifiedMessageEntityPK();
|
||||
rpm.setResourcePid(patientId);
|
||||
rpm.setResourceVersion("1");
|
||||
resourceModified.setResourceModifiedEntityPK(rpm);
|
||||
ResourceModifiedMessage msg = new ResourceModifiedMessage();
|
||||
|
||||
ourLogger.addAppender(myListAppender);
|
||||
ourLogger.setLevel(Level.ERROR);
|
||||
|
||||
// when
|
||||
when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any()))
|
||||
.thenReturn(true);
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any()))
|
||||
.thenReturn(msg);
|
||||
when(myChannelProducer.send(any()))
|
||||
.thenThrow(new RuntimeException(exceptionString));
|
||||
|
||||
// test
|
||||
boolean processed = myResourceModifiedSubmitterSvc.submitPersisedResourceModifiedMessage(resourceModified);
|
||||
|
||||
// then
|
||||
assertTrue(processed);
|
||||
|
||||
// verify
|
||||
verify(myChannelProducer)
|
||||
.send(any());
|
||||
ArgumentCaptor<ILoggingEvent> loggingCaptor = ArgumentCaptor.forClass(ILoggingEvent.class);
|
||||
verify(myListAppender).doAppend(loggingCaptor.capture());
|
||||
ILoggingEvent event = loggingCaptor.getValue();
|
||||
assertNotNull(event);
|
||||
assertTrue(event.getThrowableProxy().getMessage().contains(exceptionString));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubmitPersistedResourceModifiedMessage_whenMessageWasAlreadyProcess_willSucceed(){
|
||||
// given
|
||||
// deletion fails, someone else was faster and processed the message
|
||||
when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(false);
|
||||
|
@ -116,7 +220,7 @@ public class ResourceModifiedSubmitterSvcTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSubmitPersisedResourceModifiedMessage_whitErrorOnSending_willRollbackDeletion(){
|
||||
public void testSubmitPersistedResourceModifiedMessage_whitErrorOnSending_willRollbackDeletion(){
|
||||
// given
|
||||
when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(true);
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())).thenReturn(new ResourceModifiedMessage());
|
||||
|
|
Loading…
Reference in New Issue