From 0831e0f05ede6b212d644a3534fee51e94d2f6a3 Mon Sep 17 00:00:00 2001 From: TynerGjs <132295567+TynerGjs@users.noreply.github.com> Date: Mon, 6 Nov 2023 17:12:06 -0500 Subject: [PATCH] Resolve We don't have guaranteed subscription delivery if a resource is too large (#5414) * first fix * - added the ability to handle null payload to SubscriptionDeliveringMessageSubscriber and SubscriptionDeliveringEmailSubscriber - refactored code to reduce repeated code - cleaned unnecessary comments and reformatted files * Changed myResourceModifiedMessagePersistenceSvc to be autowired * removed unused import * added error handling when inflating the message to email and message subscriber * reformatted code * Fixing subscription tests with mocked IResourceModifiedMessagePersistenceSvc * Changes by gary * Reformatted file * fixed failed tests * implemented test for message and email delivery subscriber. Fixed logical error. Reformatted File. * - implemented IT - fixed logical error - added changelog * fix for cdr tests, NOTE: this makes the assumption that we will always succeed for inflating the database in the tests that uses SynchronousSubscriptionMatcherInterceptor * fix for cdr tests, NOTE: this makes the assumption that we will always succeed for inflating the database in the tests that uses SynchronousSubscriptionMatcherInterceptor * resolve code review comments * reformatted files * fixed tests --- ...n-delivery-if-a-resource-is-too-large.yaml | 7 +++ ...urceModifiedMessagePersistenceSvcImpl.java | 56 ++++++++++++++++--- .../BaseSubscriptionDeliverySubscriber.java | 19 +++++++ ...SubscriptionDeliveringEmailSubscriber.java | 24 +++++++- ...bscriptionDeliveringMessageSubscriber.java | 16 +++++- .../SubscriptionActivatingSubscriber.java | 16 +++++- .../SubscriptionMatchDeliverer.java | 18 +++++- .../SubscriptionMatchingSubscriber.java | 15 +++++ ...hronousSubscriptionMatcherInterceptor.java | 26 ++++++++- .../svc/ResourceModifiedSubmitterSvc.java | 40 +++---------- ...aseSubscriptionDeliverySubscriberTest.java | 53 ++++++++++++++++++ .../matching/DaoSubscriptionMatcherTest.java | 6 ++ .../cache/SubscriptionRegistrySharedTest.java | 5 ++ .../config/TestSubscriptionDstu3Config.java | 6 ++ ...kingQueueSubscribableChannelDstu3Test.java | 9 +++ .../SubscriptionMatchingSubscriberTest.java | 8 +++ .../WebsocketConnectionValidatorTest.java | 6 ++ .../message/MessageSubscriptionR4Test.java | 20 ++++--- .../svc/ResourceModifiedSubmitterSvcTest.java | 10 ++-- .../BaseResourceModifiedMessage.java | 12 +++- .../channel/api/PayloadTooLargeException.java | 16 ++++++ .../model/ResourceDeliveryMessage.java | 4 ++ .../model/ResourceModifiedMessage.java | 10 ++++ ...ResourceModifiedMessagePersistenceSvc.java | 24 +++++++- 24 files changed, 359 insertions(+), 67 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_10_0/5407-we-dont-have-guaranteed-subscription-delivery-if-a-resource-is-too-large.yaml create mode 100644 hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/api/PayloadTooLargeException.java diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_10_0/5407-we-dont-have-guaranteed-subscription-delivery-if-a-resource-is-too-large.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_10_0/5407-we-dont-have-guaranteed-subscription-delivery-if-a-resource-is-too-large.yaml new file mode 100644 index 00000000000..e7f8d7ef0be --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_10_0/5407-we-dont-have-guaranteed-subscription-delivery-if-a-resource-is-too-large.yaml @@ -0,0 +1,7 @@ +--- +type: add +issue: 5407 +title: "Previously, when the payload of a subscription message exceeds the broker maximum message size, exception would +be thrown and retry will be performed indefinitely until the maximum message size is adjusted. Now, the message will be +successfully delivered for rest-hook and email subscriptions, while message subscriptions remains the same behavior as +before." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessagePersistenceSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessagePersistenceSvcImpl.java index 86e85a85c9b..893aa6e6744 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessagePersistenceSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessagePersistenceSvcImpl.java @@ -35,6 +35,7 @@ import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedSubmitterSvc; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.api.server.SystemRequestDetails; +import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.util.Date; import java.util.List; +import java.util.Optional; import static ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK.with; @@ -92,9 +94,43 @@ public class ResourceModifiedMessagePersistenceSvcImpl implements IResourceModif @Override public ResourceModifiedMessage inflatePersistedResourceModifiedMessage( - IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) { + ResourceModifiedMessage theResourceModifiedMessage) { - return inflateResourceModifiedMessageFromEntity((ResourceModifiedEntity) thePersistedResourceModifiedMessage); + return inflateResourceModifiedMessageFromEntity(createEntityFrom(theResourceModifiedMessage)); + } + + @Override + public Optional inflatePersistedResourceModifiedMessageOrNull( + ResourceModifiedMessage theResourceModifiedMessage) { + ResourceModifiedMessage inflatedResourceModifiedMessage = null; + + try { + inflatedResourceModifiedMessage = inflatePersistedResourceModifiedMessage(theResourceModifiedMessage); + } catch (ResourceNotFoundException e) { + IdDt idDt = new IdDt( + theResourceModifiedMessage.getPayloadType(myFhirContext), + theResourceModifiedMessage.getPayloadId(), + theResourceModifiedMessage.getPayloadVersion()); + + ourLog.warn("Scheduled submission will be ignored since resource {} cannot be found", idDt.getIdPart(), e); + } catch (Exception ex) { + ourLog.error("Unknown error encountered on inflation of resources.", ex); + } + + return Optional.ofNullable(inflatedResourceModifiedMessage); + } + + @Override + public ResourceModifiedMessage createResourceModifiedMessageFromEntityWithoutInflation( + IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) { + ResourceModifiedMessage resourceModifiedMessage = getPayloadLessMessageFromString( + ((ResourceModifiedEntity) thePersistedResourceModifiedMessage).getSummaryResourceModifiedMessage()); + + IdDt resourceId = + createIdDtFromResourceModifiedEntity((ResourceModifiedEntity) thePersistedResourceModifiedMessage); + resourceModifiedMessage.setPayloadId(resourceId); + + return resourceModifiedMessage; } @Override @@ -112,17 +148,13 @@ public class ResourceModifiedMessagePersistenceSvcImpl implements IResourceModif protected ResourceModifiedMessage inflateResourceModifiedMessageFromEntity( ResourceModifiedEntity theResourceModifiedEntity) { - String resourcePid = - theResourceModifiedEntity.getResourceModifiedEntityPK().getResourcePid(); - String resourceVersion = - theResourceModifiedEntity.getResourceModifiedEntityPK().getResourceVersion(); String resourceType = theResourceModifiedEntity.getResourceType(); ResourceModifiedMessage retVal = getPayloadLessMessageFromString(theResourceModifiedEntity.getSummaryResourceModifiedMessage()); SystemRequestDetails systemRequestDetails = new SystemRequestDetails().setRequestPartitionId(retVal.getPartitionId()); - IdDt resourceIdDt = new IdDt(resourceType, resourcePid, resourceVersion); + IdDt resourceIdDt = createIdDtFromResourceModifiedEntity(theResourceModifiedEntity); IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType); IBaseResource iBaseResource = dao.read(resourceIdDt, systemRequestDetails, true); @@ -164,6 +196,16 @@ public class ResourceModifiedMessagePersistenceSvcImpl implements IResourceModif } } + private IdDt createIdDtFromResourceModifiedEntity(ResourceModifiedEntity theResourceModifiedEntity) { + String resourcePid = + theResourceModifiedEntity.getResourceModifiedEntityPK().getResourcePid(); + String resourceVersion = + theResourceModifiedEntity.getResourceModifiedEntityPK().getResourceVersion(); + String resourceType = theResourceModifiedEntity.getResourceType(); + + return new IdDt(resourceType, resourcePid, resourceVersion); + } + private static class PayloadLessResourceModifiedMessage extends ResourceModifiedMessage { public PayloadLessResourceModifiedMessage(ResourceModifiedMessage theMsg) { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriber.java index 2e4dff6e7e8..641e732b642 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriber.java @@ -33,7 +33,9 @@ import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; +import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.rest.api.server.IBundleProvider; +import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; import ca.uhn.fhir.util.BundleBuilder; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.text.StringSubstitutor; @@ -48,6 +50,7 @@ import org.springframework.messaging.MessagingException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import static ca.uhn.fhir.jpa.subscription.util.SubscriptionUtil.createRequestDetailForPartitionedRequest; @@ -60,6 +63,9 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl @Autowired protected SubscriptionRegistry mySubscriptionRegistry; + @Autowired + protected IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; + @Autowired private IInterceptorBroadcaster myInterceptorBroadcaster; @@ -149,6 +155,13 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl return builder.getBundle(); } + protected Optional inflateResourceModifiedMessageFromDeliveryMessage( + ResourceDeliveryMessage theMsg) { + ResourceModifiedMessage payloadLess = + new ResourceModifiedMessage(theMsg.getPayloadId(myFhirContext), theMsg.getOperationType()); + return myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(payloadLess); + } + @VisibleForTesting public void setFhirContextForUnitTest(FhirContext theCtx) { myFhirContext = theCtx; @@ -174,6 +187,12 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl myMatchUrlService = theMatchUrlService; } + @VisibleForTesting + public void setResourceModifiedMessagePersistenceSvcForUnitTest( + IResourceModifiedMessagePersistenceSvc theResourceModifiedMessagePersistenceSvc) { + myResourceModifiedMessagePersistenceSvc = theResourceModifiedMessagePersistenceSvc; + } + public IInterceptorBroadcaster getInterceptorBroadcaster() { return myInterceptorBroadcaster; } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/email/SubscriptionDeliveringEmailSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/email/SubscriptionDeliveringEmailSubscriber.java index 01ccf19397a..80275a84317 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/email/SubscriptionDeliveringEmailSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/email/SubscriptionDeliveringEmailSubscriber.java @@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; +import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.rest.api.EncodingEnum; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; @@ -33,6 +34,7 @@ import org.springframework.beans.factory.annotation.Autowired; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static org.apache.commons.lang3.StringUtils.defaultString; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -73,7 +75,7 @@ public class SubscriptionDeliveringEmailSubscriber extends BaseSubscriptionDeliv if (isNotBlank(subscription.getPayloadString())) { EncodingEnum encoding = EncodingEnum.forContentType(subscription.getPayloadString()); if (encoding != null) { - payload = theMessage.getPayloadString(); + payload = getPayloadStringFromMessageOrEmptyString(theMessage); } } @@ -112,4 +114,24 @@ public class SubscriptionDeliveringEmailSubscriber extends BaseSubscriptionDeliv public IEmailSender getEmailSender() { return myEmailSender; } + + /** + * Get the payload string, fetch it from the DB when the payload is null. + */ + private String getPayloadStringFromMessageOrEmptyString(ResourceDeliveryMessage theMessage) { + String payload = theMessage.getPayloadString(); + + if (theMessage.getPayload(myCtx) != null) { + return payload; + } + + Optional inflatedMessage = + inflateResourceModifiedMessageFromDeliveryMessage(theMessage); + if (inflatedMessage.isEmpty()) { + return ""; + } + + payload = inflatedMessage.get().getPayloadString(); + return payload; + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java index b801d0e8f95..c15854c143e 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java @@ -39,6 +39,7 @@ import org.springframework.messaging.MessagingException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Optional; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -66,7 +67,7 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel IBaseResource payloadResource = createDeliveryBundleForPayloadSearchCriteria( theSubscription, theWrappedMessageToSend.getPayload().getPayload(myFhirContext)); ResourceModifiedJsonMessage newWrappedMessageToSend = - convertDeliveryMessageToResourceModifiedMessage(theSourceMessage, payloadResource); + convertDeliveryMessageToResourceModifiedJsonMessage(theSourceMessage, payloadResource); theWrappedMessageToSend.setPayload(newWrappedMessageToSend.getPayload()); payloadId = payloadResource.getIdElement().toUnqualifiedVersionless().getValue(); @@ -82,7 +83,7 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel .getValue()); } - private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedMessage( + private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedJsonMessage( ResourceDeliveryMessage theMsg, IBaseResource thePayloadResource) { ResourceModifiedMessage payload = new ResourceModifiedMessage(myFhirContext, thePayloadResource, theMsg.getOperationType()); @@ -96,8 +97,17 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException, URISyntaxException { CanonicalSubscription subscription = theMessage.getSubscription(); IBaseResource payloadResource = theMessage.getPayload(myFhirContext); + if (payloadResource == null) { + Optional inflatedMsg = + inflateResourceModifiedMessageFromDeliveryMessage(theMessage); + if (inflatedMsg.isEmpty()) { + return; + } + payloadResource = inflatedMsg.get().getPayload(myFhirContext); + } + ResourceModifiedJsonMessage messageWrapperToSend = - convertDeliveryMessageToResourceModifiedMessage(theMessage, payloadResource); + convertDeliveryMessageToResourceModifiedJsonMessage(theMessage, payloadResource); // Interceptor call: SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY HookParams params = new HookParams() diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriber.java index 55914efdde5..405cd94796d 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriber.java @@ -31,6 +31,7 @@ import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.subscription.SubscriptionConstants; +import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; import ca.uhn.fhir.util.SubscriptionUtil; import org.hl7.fhir.dstu2.model.Subscription; import org.hl7.fhir.instance.model.api.IBaseResource; @@ -41,6 +42,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; +import java.util.Optional; import javax.annotation.Nonnull; /** @@ -64,6 +66,8 @@ public class SubscriptionActivatingSubscriber implements MessageHandler { @Autowired private StorageSettings myStorageSettings; + @Autowired + private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; /** * Constructor */ @@ -86,6 +90,16 @@ public class SubscriptionActivatingSubscriber implements MessageHandler { switch (payload.getOperationType()) { case CREATE: case UPDATE: + if (payload.getPayload(myFhirContext) == null) { + Optional inflatedMsg = + myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull( + payload); + if (inflatedMsg.isEmpty()) { + return; + } + payload = inflatedMsg.get(); + } + activateSubscriptionIfRequired(payload.getNewPayload(myFhirContext)); break; case TRANSACTION: @@ -104,7 +118,7 @@ public class SubscriptionActivatingSubscriber implements MessageHandler { */ public synchronized boolean activateSubscriptionIfRequired(final IBaseResource theSubscription) { // Grab the value for "Subscription.channel.type" so we can see if this - // subscriber applies.. + // subscriber applies. CanonicalSubscriptionChannelType subscriptionChannelType = mySubscriptionCanonicalizer.getChannelType(theSubscription); diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchDeliverer.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchDeliverer.java index 488a961c89d..d123f33e98a 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchDeliverer.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchDeliverer.java @@ -25,6 +25,7 @@ import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; +import ca.uhn.fhir.jpa.subscription.channel.api.PayloadTooLargeException; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; @@ -156,8 +157,21 @@ public class SubscriptionMatchDeliverer { ourLog.warn("Failed to send message to Delivery Channel."); } } catch (RuntimeException e) { - ourLog.error("Failed to send message to Delivery Channel", e); - throw new RuntimeException(Msg.code(7) + "Failed to send message to Delivery Channel", e); + if (e.getCause() instanceof PayloadTooLargeException) { + ourLog.warn("Failed to send message to Delivery Channel because the payload size is larger than broker " + + "max message size. Retry is about to be performed without payload."); + ResourceDeliveryJsonMessage msgPayloadLess = nullOutPayload(theWrappedMsg); + trySendToDeliveryChannel(msgPayloadLess, theDeliveryChannel); + } else { + ourLog.error("Failed to send message to Delivery Channel", e); + throw new RuntimeException(Msg.code(7) + "Failed to send message to Delivery Channel", e); + } } } + + private ResourceDeliveryJsonMessage nullOutPayload(ResourceDeliveryJsonMessage theWrappedMsg) { + ResourceDeliveryMessage resourceDeliveryMessage = theWrappedMsg.getPayload(); + resourceDeliveryMessage.setPayloadToNull(); + return new ResourceDeliveryJsonMessage(resourceDeliveryMessage); + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchingSubscriber.java index 08623ae0221..f2ef0d1302e 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchingSubscriber.java @@ -30,6 +30,7 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; @@ -40,6 +41,7 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.util.Collection; +import java.util.Optional; import javax.annotation.Nonnull; import static ca.uhn.fhir.rest.server.messaging.BaseResourceMessage.OperationTypeEnum.DELETE; @@ -64,6 +66,9 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { @Autowired private SubscriptionMatchDeliverer mySubscriptionMatchDeliverer; + @Autowired + private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; + /** * Constructor */ @@ -97,6 +102,16 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { return; } + if (theMsg.getPayload(myFhirContext) == null) { + // inflate the message and ignore any resource that cannot be found. + Optional inflatedMsg = + myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(theMsg); + if (inflatedMsg.isEmpty()) { + return; + } + theMsg = inflatedMsg.get(); + } + // Interceptor call: SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED HookParams params = new HookParams().add(ResourceModifiedMessage.class, theMsg); if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED, params)) { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SynchronousSubscriptionMatcherInterceptor.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SynchronousSubscriptionMatcherInterceptor.java index 33d655d6a78..33861b5e205 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SynchronousSubscriptionMatcherInterceptor.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SynchronousSubscriptionMatcherInterceptor.java @@ -20,9 +20,11 @@ package ca.uhn.fhir.jpa.subscription.submit.interceptor; import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedProcessingSchedulerSvc; +import ca.uhn.fhir.jpa.subscription.channel.api.PayloadTooLargeException; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.MessageDeliveryException; import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionSynchronizationManager; @@ -49,11 +51,33 @@ public class SynchronousSubscriptionMatcherInterceptor extends SubscriptionMatch @Override public void afterCommit() { - myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage); + doSubmitResourceModified(theResourceModifiedMessage); } }); } else { + doSubmitResourceModified(theResourceModifiedMessage); + } + } + + /** + * Submit the message through the broker channel to the matcher. + * + * Note: most of our integrated tests for subscription assume we can successfully inflate the message and therefore + * does not run with an actual database to persist the data. In these cases, submitting the complete message (i.e. + * with payload) is OK. However, there are a few tests that do not assume it and do run with an actual DB. For them, + * we should null out the payload body before submitting. This try-catch block only covers the case where the + * payload is too large, which is enough for now. However, for better practice we might want to consider splitting + * this interceptor into two, each for tests with/without DB connection. + * @param theResourceModifiedMessage + */ + private void doSubmitResourceModified(ResourceModifiedMessage theResourceModifiedMessage) { + try { myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage); + } catch (MessageDeliveryException e) { + if (e.getCause() instanceof PayloadTooLargeException) { + theResourceModifiedMessage.setPayloadToNull(); + myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage); + } } } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/svc/ResourceModifiedSubmitterSvc.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/svc/ResourceModifiedSubmitterSvc.java index 99d291959ad..e57813bbc1a 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/svc/ResourceModifiedSubmitterSvc.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/svc/ResourceModifiedSubmitterSvc.java @@ -35,7 +35,6 @@ import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import ca.uhn.fhir.subscription.api.IResourceModifiedConsumerWithRetries; import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; import org.apache.commons.lang3.Validate; -import org.hl7.fhir.r5.model.IdType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.event.ContextRefreshedEvent; @@ -45,8 +44,6 @@ import org.springframework.messaging.MessageDeliveryException; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.support.TransactionCallback; -import java.util.Optional; - import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME; /** @@ -151,12 +148,11 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer, boolean wasDeleted = deletePersistedResourceModifiedMessage( thePersistedResourceModifiedMessage.getPersistedResourceModifiedMessagePk()); - Optional optionalResourceModifiedMessage = - inflatePersistedResourceMessage(thePersistedResourceModifiedMessage); + // submit the resource modified message with empty payload, actual inflation is done by the matcher. + resourceModifiedMessage = + createResourceModifiedMessageWithoutInflation(thePersistedResourceModifiedMessage); - if (wasDeleted && optionalResourceModifiedMessage.isPresent()) { - // the PK did exist and we were able to deleted it, ie, we are the only one processing the message - resourceModifiedMessage = optionalResourceModifiedMessage.get(); + if (wasDeleted) { submitResourceModified(resourceModifiedMessage); } } catch (MessageDeliveryException exception) { @@ -186,32 +182,10 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer, }; } - private Optional inflatePersistedResourceMessage( + private ResourceModifiedMessage createResourceModifiedMessageWithoutInflation( IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) { - ResourceModifiedMessage resourceModifiedMessage = null; - - try { - resourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage( - thePersistedResourceModifiedMessage); - - } catch (ResourceNotFoundException e) { - IPersistedResourceModifiedMessagePK persistedResourceModifiedMessagePk = - thePersistedResourceModifiedMessage.getPersistedResourceModifiedMessagePk(); - - IdType idType = new IdType( - thePersistedResourceModifiedMessage.getResourceType(), - persistedResourceModifiedMessagePk.getResourcePid(), - persistedResourceModifiedMessagePk.getResourceVersion()); - - ourLog.warn( - "Scheduled submission will be ignored since resource {} cannot be found", - idType.asStringValue(), - e); - } catch (Exception ex) { - ourLog.error("Unknown error encountered on inflation of resources.", ex); - } - - return Optional.ofNullable(resourceModifiedMessage); + return myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation( + thePersistedResourceModifiedMessage); } private boolean deletePersistedResourceModifiedMessage(IPersistedResourceModifiedMessagePK theResourceModifiedPK) { diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriberTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriberTest.java index 8bfb71cb182..16789be59d3 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriberTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriberTest.java @@ -8,10 +8,13 @@ import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; +import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender; +import ca.uhn.fhir.jpa.subscription.match.deliver.email.SubscriptionDeliveringEmailSubscriber; import ca.uhn.fhir.jpa.subscription.match.deliver.message.SubscriptionDeliveringMessageSubscriber; import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; @@ -26,6 +29,7 @@ import ca.uhn.fhir.rest.client.api.IGenericClient; import ca.uhn.fhir.rest.client.api.IRestfulClientFactory; import ca.uhn.fhir.rest.server.SimpleBundleProvider; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; +import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; import com.fasterxml.jackson.core.JsonProcessingException; import org.hl7.fhir.r4.model.Bundle; import org.hl7.fhir.r4.model.IdType; @@ -33,6 +37,8 @@ import org.hl7.fhir.r4.model.Patient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Answers; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; @@ -57,6 +63,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -71,6 +78,7 @@ public class BaseSubscriptionDeliverySubscriberTest { private SubscriptionDeliveringRestHookSubscriber mySubscriber; private SubscriptionDeliveringMessageSubscriber myMessageSubscriber; + private SubscriptionDeliveringEmailSubscriber myEmailSubscriber; private final FhirContext myCtx = FhirContext.forR4(); @Mock @@ -96,6 +104,12 @@ public class BaseSubscriptionDeliverySubscriberTest { @Mock private MatchUrlService myMatchUrlService; + @Mock + private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; + + @Mock + private IEmailSender myEmailSender; + @BeforeEach public void before() { mySubscriber = new SubscriptionDeliveringRestHookSubscriber(); @@ -109,8 +123,15 @@ public class BaseSubscriptionDeliverySubscriberTest { myMessageSubscriber.setSubscriptionRegistryForUnitTest(mySubscriptionRegistry); myMessageSubscriber.setDaoRegistryForUnitTest(myDaoRegistry); myMessageSubscriber.setMatchUrlServiceForUnitTest(myMatchUrlService); + myMessageSubscriber.setResourceModifiedMessagePersistenceSvcForUnitTest(myResourceModifiedMessagePersistenceSvc); myCtx.setRestfulClientFactory(myRestfulClientFactory); when(myRestfulClientFactory.newGenericClient(any())).thenReturn(myGenericClient); + + myEmailSubscriber = new SubscriptionDeliveringEmailSubscriber(myEmailSender); + myEmailSubscriber.setFhirContextForUnitTest(myCtx); + myEmailSubscriber.setInterceptorBroadcasterForUnitTest(myInterceptorBroadcaster); + myEmailSubscriber.setSubscriptionRegistryForUnitTest(mySubscriptionRegistry); + myEmailSubscriber.setResourceModifiedMessagePersistenceSvcForUnitTest(myResourceModifiedMessagePersistenceSvc); } @Test @@ -400,6 +421,38 @@ public class BaseSubscriptionDeliverySubscriberTest { } } + @ParameterizedTest + @ValueSource(strings = {"message", "email"}) + public void testMessageAndEmailSubscriber_whenPayloadIsNull_shouldTryInflateMessage(String theSubscriber) { + // setup + when(myInterceptorBroadcaster.callHooks(any(), any())).thenReturn(true); + + Patient patient = generatePatient(); + + CanonicalSubscription subscription = generateSubscription(); + + ResourceDeliveryMessage payload = new ResourceDeliveryMessage(); + payload.setSubscription(subscription); + payload.setPayload(myCtx, patient, EncodingEnum.JSON); + payload.setOperationType(ResourceModifiedMessage.OperationTypeEnum.CREATE); + + // mock the inflated message + when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(any())).thenReturn(any()); + + // this will null out the payload but keep the resource id and version. + payload.setPayloadToNull(); + + // execute & verify + switch (theSubscriber) { + case "message" -> + assertThrows(MessagingException.class, () -> myMessageSubscriber.handleMessage(new ResourceDeliveryJsonMessage(payload))); + case "email" -> + assertThrows(MessagingException.class, () -> myEmailSubscriber.handleMessage(new ResourceDeliveryJsonMessage(payload))); + } + + verify(myResourceModifiedMessagePersistenceSvc, times(1)).inflatePersistedResourceModifiedMessageOrNull(any()); + } + @Nonnull private Patient generatePatient() { Patient patient = new Patient(); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/matcher/matching/DaoSubscriptionMatcherTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/matcher/matching/DaoSubscriptionMatcherTest.java index f1933548254..817eca140e6 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/matcher/matching/DaoSubscriptionMatcherTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/matcher/matching/DaoSubscriptionMatcherTest.java @@ -15,6 +15,7 @@ import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig; import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender; import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig; import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionQueryValidator; +import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; @@ -90,6 +91,11 @@ public class DaoSubscriptionMatcherTest { public IEmailSender emailSender(){ return mock(IEmailSender.class); } + + @Bean + public IResourceModifiedMessagePersistenceSvc resourceModifiedMessagePersistenceSvc() { + return mock(IResourceModifiedMessagePersistenceSvc.class); + } } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistrySharedTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistrySharedTest.java index b7fa0f3df6a..1dbb4cf0721 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistrySharedTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistrySharedTest.java @@ -2,8 +2,10 @@ package ca.uhn.fhir.jpa.subscription.module.cache; import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; +import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; import org.hl7.fhir.dstu3.model.Subscription; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; @@ -18,6 +20,9 @@ public class SubscriptionRegistrySharedTest extends BaseSubscriptionRegistryTest private static final String OTHER_ID = "OTHER_ID"; + @Autowired + private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; + @Configuration public static class SpringConfig { diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/config/TestSubscriptionDstu3Config.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/config/TestSubscriptionDstu3Config.java index b53918743a8..76fd776b5e7 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/config/TestSubscriptionDstu3Config.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/config/TestSubscriptionDstu3Config.java @@ -6,6 +6,7 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider; +import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; import com.google.common.collect.Lists; import org.hl7.fhir.dstu3.model.Subscription; import org.slf4j.Logger; @@ -62,4 +63,9 @@ public class TestSubscriptionDstu3Config { return mock; } + @Bean + public IResourceModifiedMessagePersistenceSvc resourceModifiedMessagePersistenceSvc() { + return mock(IResourceModifiedMessagePersistenceSvc.class); + } + } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java index e5fa3a5ee8e..b081bbdb37f 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java @@ -26,6 +26,7 @@ import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.RestfulServer; +import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; import ca.uhn.fhir.test.utilities.JettyUtil; import ca.uhn.test.concurrency.IPointcutLatch; import ca.uhn.test.concurrency.PointcutLatch; @@ -54,6 +55,10 @@ import javax.servlet.http.HttpServletRequest; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends BaseSubscriptionDstu3Test { public static final ChannelConsumerSettings CONSUMER_OPTIONS = new ChannelConsumerSettings().setConcurrentConsumers(1); @@ -100,6 +105,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base IInterceptorService myInterceptorRegistry; @Autowired private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer; + @Autowired + private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; @BeforeEach public void beforeReset() { @@ -140,6 +147,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base public T sendResource(T theResource, RequestPartitionId theRequestPartitionId) throws InterruptedException { ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE, null, theRequestPartitionId); ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(msg); + when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(any())).thenReturn(Optional.of(msg)); + mySubscriptionMatchingPost.setExpectedCount(1); ourSubscribableChannel.send(message); mySubscriptionMatchingPost.awaitExpected(); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java index 6f7710b042f..7730df2e400 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java @@ -17,6 +17,7 @@ import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscriba import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage; +import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; import ca.uhn.fhir.util.HapiExtensions; import com.google.common.collect.Lists; import org.hl7.fhir.dstu3.model.BooleanType; @@ -33,6 +34,7 @@ import org.mockito.Mockito; import java.util.Collections; import java.util.List; +import java.util.Optional; import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser.TypeEnum.STARTYPE_EXPRESSION; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -434,6 +436,8 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri SubscriptionCriteriaParser.SubscriptionCriteria mySubscriptionCriteria; @Mock SubscriptionMatchDeliverer mySubscriptionMatchDeliverer; + @Mock + IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; @InjectMocks SubscriptionMatchingSubscriber subscriber; @@ -445,6 +449,7 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri when(myInterceptorBroadcaster.callHooks( eq(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED), any(HookParams.class))).thenReturn(true); when(mySubscriptionRegistry.getAll()).thenReturn(Collections.emptyList()); + when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(any())).thenReturn(Optional.ofNullable(message)); subscriber.matchActiveSubscriptionsAndDeliver(message); @@ -465,6 +470,7 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri when(myActiveSubscription.getCriteria()).thenReturn(mySubscriptionCriteria); when(myActiveSubscription.getId()).thenReturn("Patient/123"); when(mySubscriptionCriteria.getType()).thenReturn(STARTYPE_EXPRESSION); + when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(any())).thenReturn(Optional.ofNullable(message)); subscriber.matchActiveSubscriptionsAndDeliver(message); @@ -486,6 +492,7 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri when(myNonDeleteSubscription.getCriteria()).thenReturn(mySubscriptionCriteria); when(myNonDeleteSubscription.getId()).thenReturn("Patient/123"); when(mySubscriptionCriteria.getType()).thenReturn(STARTYPE_EXPRESSION); + when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(any())).thenReturn(Optional.ofNullable(message)); subscriber.matchActiveSubscriptionsAndDeliver(message); @@ -505,6 +512,7 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri when(myActiveSubscription.getId()).thenReturn("Patient/123"); when(mySubscriptionCriteria.getType()).thenReturn(STARTYPE_EXPRESSION); when(myCanonicalSubscription.getSendDeleteMessages()).thenReturn(true); + when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(any())).thenReturn(Optional.ofNullable(message)); subscriber.matchActiveSubscriptionsAndDeliver(message); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/WebsocketConnectionValidatorTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/WebsocketConnectionValidatorTest.java index a14e8794964..4bf3e1fcf3b 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/WebsocketConnectionValidatorTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/WebsocketConnectionValidatorTest.java @@ -20,6 +20,7 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; +import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; import org.hl7.fhir.r4.model.IdType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -146,5 +147,10 @@ public class WebsocketConnectionValidatorTest { return mock(IEmailSender.class); } + @Bean + public IResourceModifiedMessagePersistenceSvc resourceModifiedMessagePersistenceSvc(){ + return mock(IResourceModifiedMessagePersistenceSvc.class); + } + } } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/message/MessageSubscriptionR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/message/MessageSubscriptionR4Test.java index 2fc3ca8bb20..03b12841055 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/message/MessageSubscriptionR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/message/MessageSubscriptionR4Test.java @@ -253,26 +253,28 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test { } @Test - public void testPersistedResourceModifiedMessage_whenFetchFromDb_willEqualOriginalMessage() throws JsonProcessingException { + public void testMethodInflatePersistedResourceModifiedMessage_whenGivenResourceModifiedMessageWithEmptyPayload_willEqualOriginalMessage() { mySubscriptionTestUtil.unregisterSubscriptionInterceptor(); - // given + // setup TransactionTemplate transactionTemplate = new TransactionTemplate(myTxManager); Observation obs = sendObservation("zoop", "SNOMED-CT", "theExplicitSource", "theRequestId"); ResourceModifiedMessage originalResourceModifiedMessage = createResourceModifiedMessage(obs); + ResourceModifiedMessage resourceModifiedMessageWithEmptyPayload = createResourceModifiedMessage(obs); + resourceModifiedMessageWithEmptyPayload.setPayloadToNull(); transactionTemplate.execute(tx -> { - IPersistedResourceModifiedMessage persistedResourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.persist(originalResourceModifiedMessage); + myResourceModifiedMessagePersistenceSvc.persist(originalResourceModifiedMessage); - // when - ResourceModifiedMessage restoredResourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(persistedResourceModifiedMessage); + // execute + ResourceModifiedMessage restoredResourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(resourceModifiedMessageWithEmptyPayload); - // then - assertEquals(toJson(originalResourceModifiedMessage), toJson(restoredResourceModifiedMessage)); - assertEquals(originalResourceModifiedMessage, restoredResourceModifiedMessage); + // verify + assertEquals(toJson(originalResourceModifiedMessage), toJson(restoredResourceModifiedMessage)); + assertEquals(originalResourceModifiedMessage, restoredResourceModifiedMessage); - return null; + return null; }); } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/svc/ResourceModifiedSubmitterSvcTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/svc/ResourceModifiedSubmitterSvcTest.java index 21d1f97c686..f8194204aaa 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/svc/ResourceModifiedSubmitterSvcTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/svc/ResourceModifiedSubmitterSvcTest.java @@ -105,7 +105,7 @@ public class ResourceModifiedSubmitterSvcTest { // given // a successful deletion implies that the message did exist. when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(true); - when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())).thenReturn(new ResourceModifiedMessage()); + when(myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation(any())).thenReturn(new ResourceModifiedMessage()); // when boolean wasProcessed = myResourceModifiedSubmitterSvc.submitPersisedResourceModifiedMessage(new ResourceModifiedEntity()); @@ -134,7 +134,7 @@ public class ResourceModifiedSubmitterSvcTest { // when when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())) .thenThrow(new RuntimeException(deleteExMsg)); - when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())) + when(myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation(any())) .thenThrow(new RuntimeException(inflationExMsg)); // test @@ -180,7 +180,7 @@ public class ResourceModifiedSubmitterSvcTest { // when when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())) .thenReturn(true); - when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())) + when(myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation(any())) .thenReturn(msg); when(myChannelProducer.send(any())) .thenThrow(new RuntimeException(exceptionString)); @@ -206,7 +206,7 @@ public class ResourceModifiedSubmitterSvcTest { // given // deletion fails, someone else was faster and processed the message when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(false); - when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())).thenReturn(new ResourceModifiedMessage()); + when(myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation(any())).thenReturn(new ResourceModifiedMessage()); // when boolean wasProcessed = myResourceModifiedSubmitterSvc.submitPersisedResourceModifiedMessage(new ResourceModifiedEntity()); @@ -223,7 +223,7 @@ public class ResourceModifiedSubmitterSvcTest { public void testSubmitPersistedResourceModifiedMessage_whitErrorOnSending_willRollbackDeletion(){ // given when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(true); - when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())).thenReturn(new ResourceModifiedMessage()); + when(myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation(any())).thenReturn(new ResourceModifiedMessage()); // simulate failure writing to the channel when(myChannelProducer.send(any())).thenThrow(new MessageDeliveryException("sendingError")); diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceModifiedMessage.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceModifiedMessage.java index c98030e643a..bda74f798d7 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceModifiedMessage.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceModifiedMessage.java @@ -52,15 +52,15 @@ public abstract class BaseResourceModifiedMessage extends BaseResourceMessage im @JsonProperty(value = "partitionId") protected RequestPartitionId myPartitionId; + @JsonProperty(value = "payloadVersion") + protected String myPayloadVersion; + @JsonIgnore protected transient IBaseResource myPayloadDecoded; @JsonIgnore protected transient String myPayloadType; - @JsonIgnore - protected String myPayloadVersion; - /** * Constructor */ @@ -68,6 +68,12 @@ public abstract class BaseResourceModifiedMessage extends BaseResourceMessage im super(); } + public BaseResourceModifiedMessage(IIdType theIdType, OperationTypeEnum theOperationType) { + this(); + setOperationType(theOperationType); + setPayloadId(theIdType); + } + public BaseResourceModifiedMessage( FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) { this(); diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/api/PayloadTooLargeException.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/api/PayloadTooLargeException.java new file mode 100644 index 00000000000..1656f45fc6a --- /dev/null +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/api/PayloadTooLargeException.java @@ -0,0 +1,16 @@ +package ca.uhn.fhir.jpa.subscription.channel.api; + +/** + * This exception represents the message payload exceeded the maximum message size of the broker. Used as a wrapper of + * similar exceptions specific to different message brokers, e.g. kafka.common.errors.RecordTooLargeException. + */ +public class PayloadTooLargeException extends RuntimeException { + + public PayloadTooLargeException(String theMessage) { + super(theMessage); + } + + public PayloadTooLargeException(String theMessage, Throwable theThrowable) { + super(theMessage, theThrowable); + } +} diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ResourceDeliveryMessage.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ResourceDeliveryMessage.java index 9cd638d2600..04cf84a0cc6 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ResourceDeliveryMessage.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ResourceDeliveryMessage.java @@ -108,6 +108,10 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes myPayloadId = thePayload.getIdElement().toUnqualifiedVersionless().getValue(); } + public void setPayloadToNull() { + myPayloadString = null; + } + @Override public String getPayloadId() { return myPayloadId; diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ResourceModifiedMessage.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ResourceModifiedMessage.java index ab4fe6c7de9..e493035f80c 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ResourceModifiedMessage.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ResourceModifiedMessage.java @@ -26,6 +26,7 @@ import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.commons.lang3.builder.ToStringBuilder; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; /** * Most of this class has been moved to ResourceModifiedMessage in the hapi-fhir-server project, for a reusable channel ResourceModifiedMessage @@ -47,6 +48,11 @@ public class ResourceModifiedMessage extends BaseResourceModifiedMessage { super(); } + public ResourceModifiedMessage(IIdType theIdType, OperationTypeEnum theOperationType) { + super(theIdType, theOperationType); + setPartitionId(RequestPartitionId.defaultPartition()); + } + public ResourceModifiedMessage( FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) { super(theFhirContext, theResource, theOperationType); @@ -79,6 +85,10 @@ public class ResourceModifiedMessage extends BaseResourceModifiedMessage { mySubscriptionId = theSubscriptionId; } + public void setPayloadToNull() { + myPayload = null; + } + @Override public String toString() { return new ToStringBuilder(this) diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/subscription/api/IResourceModifiedMessagePersistenceSvc.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/subscription/api/IResourceModifiedMessagePersistenceSvc.java index 68aad03a48c..5f54b04e544 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/subscription/api/IResourceModifiedMessagePersistenceSvc.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/subscription/api/IResourceModifiedMessagePersistenceSvc.java @@ -25,6 +25,7 @@ import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import java.util.List; +import java.util.Optional; /** * An implementer of this interface will provide {@link ResourceModifiedMessage} persistence services. @@ -61,10 +62,29 @@ public interface IResourceModifiedMessagePersistenceSvc { /** * Restore a resourceModifiedMessage to its pre persistence representation. * - * @param thePersistedResourceModifiedMessage The message needing restoration. + * @param theResourceModifiedMessage The message needing restoration. * @return The resourceModifiedMessage in its pre persistence form. */ - ResourceModifiedMessage inflatePersistedResourceModifiedMessage( + ResourceModifiedMessage inflatePersistedResourceModifiedMessage(ResourceModifiedMessage theResourceModifiedMessage); + + /** + * Restore a resourceModifiedMessage to its pre persistence representation or null if the resource does not exist. + * + * @param theResourceModifiedMessage + * @return An Optional containing The resourceModifiedMessage in its pre persistence form or null when the resource + * does not exist + */ + Optional inflatePersistedResourceModifiedMessageOrNull( + ResourceModifiedMessage theResourceModifiedMessage); + + /** + * Create a ResourceModifiedMessage without its pre persistence representation, i.e. without the resource body in + * payload + * + * @param thePersistedResourceModifiedMessage The message needing creation + * @return The resourceModifiedMessage without its pre persistence form + */ + ResourceModifiedMessage createResourceModifiedMessageFromEntityWithoutInflation( IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage); /**