diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java index 8f2df1f4d77..4d6b44bef62 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java @@ -18,10 +18,7 @@ import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.MessagingException; +import org.springframework.messaging.*; import org.springframework.stereotype.Service; import java.util.Collection; @@ -163,14 +160,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { return; } - ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg); - MessageChannel deliveryChannel = nextActiveSubscription.getSubscribableChannel(); - if (deliveryChannel != null) { - resourceMatched = true; - deliveryChannel.send(wrappedMsg); - } else { - ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getIdElement(myFhirContext)); - } + resourceMatched |= sendToDeliveryChannel(nextActiveSubscription, deliveryMsg); } if (!resourceMatched) { @@ -181,6 +171,31 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { } } + private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) { + boolean retval = false; + ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg); + MessageChannel deliveryChannel = nextActiveSubscription.getSubscribableChannel(); + if (deliveryChannel != null) { + retval = true; + trySendToDeliveryChannel(wrappedMsg, deliveryChannel); + } else { + ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getIdElement(myFhirContext)); + } + return retval; + } + + private void trySendToDeliveryChannel(ResourceDeliveryJsonMessage theWrappedMsg, MessageChannel theDeliveryChannel) { + try { + boolean success = theDeliveryChannel.send(theWrappedMsg); + if (!success) { + ourLog.warn("Failed to send message to Delivery Channel."); + } + } catch (RuntimeException e) { + ourLog.error("Failed to send message to Delivery Channel", e); + throw new RuntimeException("Failed to send message to Delivery Channel", e); + } + } + private String getId(ActiveSubscription theActiveSubscription) { return theActiveSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue(); }