From 0a1e782a8ea4c29eb490836ebd0360c3e6999635 Mon Sep 17 00:00:00 2001
From: Tadgh
Date: Thu, 21 Jul 2022 21:56:27 -0400
Subject: [PATCH] 3817 enhance subscription before message delivery (#3824)
* Add changelog
* implementation and changes
* collection implementation
* tidy
---
.../ca/uhn/fhir/interceptor/api/Pointcut.java | 6 ++-
.../6_1_0/3817-enhance-pointcut.yaml | 4 ++
.../BaseSubscriptionDeliverySubscriber.java | 1 +
...bscriptionDeliveringMessageSubscriber.java | 23 +++++-----
...aseSubscriptionDeliverySubscriberTest.java | 42 +++++++++++++++++++
.../messaging/json/HapiMessageHeaders.java | 3 ++
6 files changed, 65 insertions(+), 14 deletions(-)
create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_1_0/3817-enhance-pointcut.yaml
diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java
index 9edd279947c..61ecb4fee3a 100644
--- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java
+++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java
@@ -871,11 +871,15 @@ public enum Pointcut implements IPointcut {
* Hooks may make changes to the delivery payload, or make changes to the
* canonical subscription such as adding headers, modifying the channel
* endpoint, etc.
+ * Furthermore, you may modify the outgoing message wrapper, for example adding headers via ResourceModifiedJsonMessage field.
+ *
*
* Hooks may accept the following parameters:
*
* - ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription
* - ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage
+ * - ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage
+ *
*
*
* Hooks may return void
or may return a boolean
. If the method returns
@@ -883,7 +887,7 @@ public enum Pointcut implements IPointcut {
* returns false
, processing will be aborted.
*
*/
- SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY(boolean.class, "ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage"),
+ SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY(boolean.class, "ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage", "ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage"),
/**
diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_1_0/3817-enhance-pointcut.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_1_0/3817-enhance-pointcut.yaml
new file mode 100644
index 00000000000..ff4c7282f08
--- /dev/null
+++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_1_0/3817-enhance-pointcut.yaml
@@ -0,0 +1,4 @@
+---
+type: add
+issue: 3817
+title: "The 'SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY' pointcut now supports a new parameter, `ResourceModifiedJsonMessage`. This permits interceptor implementers to modify the outgoing envelope before it is sent off. "
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriber.java
index c87f6c272a6..0abccb0ecb9 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriber.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriber.java
@@ -29,6 +29,7 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
+import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java
index e514681d5ec..b5a8285a798 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java
@@ -32,11 +32,9 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.EncodingEnum;
-import com.google.common.annotations.VisibleForTesting;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.messaging.MessagingException;
@@ -57,31 +55,30 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
myChannelFactory = theChannelFactory;
}
- protected void deliverPayload(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer) {
- IBaseResource payloadResource = theMsg.getPayload(myFhirContext);
-
- // Regardless of whether we have a payload, the message should be sent.
- doDelivery(theMsg, theSubscription, theChannelProducer, payloadResource);
+ protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, ResourceModifiedJsonMessage theMessage) {
+ theChannelProducer.send(theMessage);
+ ourLog.debug("Delivering {} message payload {} for {}", theMsg.getOperationType(), theMsg.getPayloadId(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
}
- protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, IBaseResource thePayloadResource) {
+ private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedMessage(ResourceDeliveryMessage theMsg) {
+ IBaseResource thePayloadResource = theMsg.getPayload(myFhirContext);
ResourceModifiedMessage payload = new ResourceModifiedMessage(myFhirContext, thePayloadResource, theMsg.getOperationType());
payload.setMessageKey(theMsg.getMessageKeyOrNull());
payload.setTransactionId(theMsg.getTransactionId());
payload.setPartitionId(theMsg.getRequestPartitionId());
- ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(payload);
- theChannelProducer.send(message);
- ourLog.debug("Delivering {} message payload {} for {}", theMsg.getOperationType(), theMsg.getPayloadId(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
+ return new ResourceModifiedJsonMessage(payload);
}
@Override
public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException, URISyntaxException {
CanonicalSubscription subscription = theMessage.getSubscription();
+ ResourceModifiedJsonMessage messageWrapperToSend = convertDeliveryMessageToResourceModifiedMessage(theMessage);
// Interceptor call: SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY
HookParams params = new HookParams()
.add(CanonicalSubscription.class, subscription)
- .add(ResourceDeliveryMessage.class, theMessage);
+ .add(ResourceDeliveryMessage.class, theMessage)
+ .add(ResourceModifiedJsonMessage.class, messageWrapperToSend);
if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY, params)) {
return;
}
@@ -106,7 +103,7 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
throw new UnsupportedOperationException(Msg.code(4) + "Only JSON payload type is currently supported for Message Subscriptions");
}
- deliverPayload(theMessage, subscription, channelProducer);
+ doDelivery(theMessage, subscription, channelProducer, messageWrapperToSend);
// Interceptor call: SUBSCRIPTION_AFTER_MESSAGE_DELIVERY
params = new HookParams()
diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriberTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriberTest.java
index f4c0b7550c6..41e756de99c 100644
--- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriberTest.java
+++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriberTest.java
@@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.subscription.match.deliver;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
+import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
@@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
@@ -38,10 +40,15 @@ import org.springframework.messaging.support.GenericMessage;
import javax.annotation.Nonnull;
import java.net.URISyntaxException;
import java.time.LocalDate;
+import java.util.Collection;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
@@ -171,6 +178,41 @@ public class BaseSubscriptionDeliverySubscriberTest {
verify(myGenericClient, times(1)).update();
}
+ @Test
+ public void testMessageSubscriber_PermitsInterceptorsToModifyOutgoingEnvelope() throws URISyntaxException {
+
+ //Given: We setup mocks, and have this mock interceptor inject those headers.
+ when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY), ArgumentMatchers.any(HookParams.class))).thenAnswer(t -> {
+ HookParams argument = t.getArgument(1, HookParams.class);
+ ResourceModifiedJsonMessage resourceModifiedJsonMessage = argument.get(ResourceModifiedJsonMessage.class);
+ resourceModifiedJsonMessage.getHapiHeaders().getCustomHeaders().put("foo", List.of("bar", "bar2"));
+ return true;
+ });
+ when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_AFTER_MESSAGE_DELIVERY), any())).thenReturn(false);
+ when(myChannelFactory.getOrCreateProducer(any(), any(), any())).thenReturn(myChannelProducer);
+
+ CanonicalSubscription subscription = generateSubscription();
+ Patient patient = generatePatient();
+
+ ResourceDeliveryMessage payload = new ResourceDeliveryMessage();
+ payload.setSubscription(subscription);
+ payload.setPayload(myCtx, patient, EncodingEnum.JSON);
+ payload.setOperationType(ResourceModifiedMessage.OperationTypeEnum.CREATE);
+
+ //When: We submit the message for delivery
+ myMessageSubscriber.handleMessage(payload);
+
+ //Then: The receiving channel should also receive the custom headers.
+ ArgumentCaptor captor = ArgumentCaptor.forClass(ResourceModifiedJsonMessage.class);
+ verify(myChannelProducer).send(captor.capture());
+ final List messages = captor.getAllValues();
+ assertThat(messages, hasSize(1));
+ ResourceModifiedJsonMessage receivedMessage = messages.get(0);
+ Collection foo = (Collection) receivedMessage.getHapiHeaders().getCustomHeaders().get("foo");
+
+ assertThat(foo, containsInAnyOrder("bar", "bar2"));
+ }
+
@Test
public void testRestHookDeliveryAbortedByInterceptor() {
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY), any())).thenReturn(true);
diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/json/HapiMessageHeaders.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/json/HapiMessageHeaders.java
index bb9f3de020e..dc5249c6483 100644
--- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/json/HapiMessageHeaders.java
+++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/json/HapiMessageHeaders.java
@@ -83,6 +83,9 @@ public class HapiMessageHeaders implements IModelJson {
public Map getCustomHeaders() {
+ if (this.headers == null) {
+ return new HashMap<>();
+ }
return this.headers;
}