3817 enhance subscription before message delivery (#3824)

* Add changelog

* implementation and changes

* collection implementation

* tidy
This commit is contained in:
Tadgh 2022-07-21 21:56:27 -04:00 committed by GitHub
parent f5697e13c9
commit 0a1e782a8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 65 additions and 14 deletions

View File

@ -871,11 +871,15 @@ public enum Pointcut implements IPointcut {
* Hooks may make changes to the delivery payload, or make changes to the
* canonical subscription such as adding headers, modifying the channel
* endpoint, etc.
* Furthermore, you may modify the outgoing message wrapper, for example adding headers via ResourceModifiedJsonMessage field.
*
* </p>
* Hooks may accept the following parameters:
* <ul>
* <li>ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription</li>
* <li>ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage</li>
* <li>ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage</li>
*
* </ul>
* <p>
* Hooks may return <code>void</code> or may return a <code>boolean</code>. If the method returns
@ -883,7 +887,7 @@ public enum Pointcut implements IPointcut {
* returns <code>false</code>, processing will be aborted.
* </p>
*/
SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY(boolean.class, "ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage"),
SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY(boolean.class, "ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage", "ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage"),
/**

View File

@ -0,0 +1,4 @@
---
type: add
issue: 3817
title: "The 'SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY' pointcut now supports a new parameter, `ResourceModifiedJsonMessage`. This permits interceptor implementers to modify the outgoing envelope before it is sent off. "

View File

@ -29,6 +29,7 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;

View File

@ -32,11 +32,9 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.EncodingEnum;
import com.google.common.annotations.VisibleForTesting;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.messaging.MessagingException;
@ -57,31 +55,30 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
myChannelFactory = theChannelFactory;
}
protected void deliverPayload(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer) {
IBaseResource payloadResource = theMsg.getPayload(myFhirContext);
// Regardless of whether we have a payload, the message should be sent.
doDelivery(theMsg, theSubscription, theChannelProducer, payloadResource);
protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, ResourceModifiedJsonMessage theMessage) {
theChannelProducer.send(theMessage);
ourLog.debug("Delivering {} message payload {} for {}", theMsg.getOperationType(), theMsg.getPayloadId(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
}
protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, IBaseResource thePayloadResource) {
private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedMessage(ResourceDeliveryMessage theMsg) {
IBaseResource thePayloadResource = theMsg.getPayload(myFhirContext);
ResourceModifiedMessage payload = new ResourceModifiedMessage(myFhirContext, thePayloadResource, theMsg.getOperationType());
payload.setMessageKey(theMsg.getMessageKeyOrNull());
payload.setTransactionId(theMsg.getTransactionId());
payload.setPartitionId(theMsg.getRequestPartitionId());
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(payload);
theChannelProducer.send(message);
ourLog.debug("Delivering {} message payload {} for {}", theMsg.getOperationType(), theMsg.getPayloadId(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
return new ResourceModifiedJsonMessage(payload);
}
@Override
public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException, URISyntaxException {
CanonicalSubscription subscription = theMessage.getSubscription();
ResourceModifiedJsonMessage messageWrapperToSend = convertDeliveryMessageToResourceModifiedMessage(theMessage);
// Interceptor call: SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY
HookParams params = new HookParams()
.add(CanonicalSubscription.class, subscription)
.add(ResourceDeliveryMessage.class, theMessage);
.add(ResourceDeliveryMessage.class, theMessage)
.add(ResourceModifiedJsonMessage.class, messageWrapperToSend);
if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY, params)) {
return;
}
@ -106,7 +103,7 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
throw new UnsupportedOperationException(Msg.code(4) + "Only JSON payload type is currently supported for Message Subscriptions");
}
deliverPayload(theMessage, subscription, channelProducer);
doDelivery(theMessage, subscription, channelProducer, messageWrapperToSend);
// Interceptor call: SUBSCRIPTION_AFTER_MESSAGE_DELIVERY
params = new HookParams()

View File

@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.subscription.match.deliver;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
@ -38,10 +40,15 @@ import org.springframework.messaging.support.GenericMessage;
import javax.annotation.Nonnull;
import java.net.URISyntaxException;
import java.time.LocalDate;
import java.util.Collection;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
@ -171,6 +178,41 @@ public class BaseSubscriptionDeliverySubscriberTest {
verify(myGenericClient, times(1)).update();
}
@Test
public void testMessageSubscriber_PermitsInterceptorsToModifyOutgoingEnvelope() throws URISyntaxException {
//Given: We setup mocks, and have this mock interceptor inject those headers.
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY), ArgumentMatchers.any(HookParams.class))).thenAnswer(t -> {
HookParams argument = t.getArgument(1, HookParams.class);
ResourceModifiedJsonMessage resourceModifiedJsonMessage = argument.get(ResourceModifiedJsonMessage.class);
resourceModifiedJsonMessage.getHapiHeaders().getCustomHeaders().put("foo", List.of("bar", "bar2"));
return true;
});
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_AFTER_MESSAGE_DELIVERY), any())).thenReturn(false);
when(myChannelFactory.getOrCreateProducer(any(), any(), any())).thenReturn(myChannelProducer);
CanonicalSubscription subscription = generateSubscription();
Patient patient = generatePatient();
ResourceDeliveryMessage payload = new ResourceDeliveryMessage();
payload.setSubscription(subscription);
payload.setPayload(myCtx, patient, EncodingEnum.JSON);
payload.setOperationType(ResourceModifiedMessage.OperationTypeEnum.CREATE);
//When: We submit the message for delivery
myMessageSubscriber.handleMessage(payload);
//Then: The receiving channel should also receive the custom headers.
ArgumentCaptor<ResourceModifiedJsonMessage> captor = ArgumentCaptor.forClass(ResourceModifiedJsonMessage.class);
verify(myChannelProducer).send(captor.capture());
final List<ResourceModifiedJsonMessage> messages = captor.getAllValues();
assertThat(messages, hasSize(1));
ResourceModifiedJsonMessage receivedMessage = messages.get(0);
Collection<String> foo = (Collection<String>) receivedMessage.getHapiHeaders().getCustomHeaders().get("foo");
assertThat(foo, containsInAnyOrder("bar", "bar2"));
}
@Test
public void testRestHookDeliveryAbortedByInterceptor() {
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY), any())).thenReturn(true);

View File

@ -83,6 +83,9 @@ public class HapiMessageHeaders implements IModelJson {
public Map<String, Object> getCustomHeaders() {
if (this.headers == null) {
return new HashMap<>();
}
return this.headers;
}