From 4368e33fdabf43326ba3e445560a133ee342de0e Mon Sep 17 00:00:00 2001 From: Aditya Dave Date: Fri, 9 Aug 2024 14:52:05 -0400 Subject: [PATCH] 6182 introduce a new pointcut to allow customization before sending message to the broker on mdm submit (#6183) * add a new pointcut for changing message before submitting it to message broker * failing test * fix test by invoking hook when hook is registered * spotless * update docs * changelog * review changes * spotless --- .../ca/uhn/fhir/interceptor/api/Pointcut.java | 16 ++++++++++++++++ .../7_6_0/6182-new-pointcut-for-mdm-submit.yml | 4 ++++ .../jpa/mdm/provider/MdmProviderBatchR4Test.java | 15 +++++++++++++-- .../fhir/mdm/svc/MdmChannelSubmitterSvcImpl.java | 15 +++++++++++++-- 4 files changed, 46 insertions(+), 4 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_6_0/6182-new-pointcut-for-mdm-submit.yml diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java index fd23e3160fb..d2c87d99ab5 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java @@ -2515,6 +2515,22 @@ public enum Pointcut implements IPointcut { MDM_SUBMIT( void.class, "ca.uhn.fhir.rest.api.server.RequestDetails", "ca.uhn.fhir.mdm.model.mdmevents.MdmSubmitEvent"), + /** + * MDM_SUBMIT_PRE_MESSAGE_DELIVERY Hook: + * Invoked immediately before the delivery of a MESSAGE to the broker. + *

+ * Hooks can make changes to the delivery payload. + * Furthermore, modification can be made to the outgoing message, + * for example adding headers or changing message key, + * which will be used for the subsequent processing. + *

+ * Hooks should accept the following parameters: + * + */ + MDM_SUBMIT_PRE_MESSAGE_DELIVERY(void.class, "ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage"), + /** * JPA Hook: * This hook is invoked when a cross-partition reference is about to be diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_6_0/6182-new-pointcut-for-mdm-submit.yml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_6_0/6182-new-pointcut-for-mdm-submit.yml new file mode 100644 index 00000000000..06c8280060d --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_6_0/6182-new-pointcut-for-mdm-submit.yml @@ -0,0 +1,4 @@ +--- +type: add +issue: 6182 +title: "A new Pointcut called `MDM_SUBMIT_PRE_MESSAGE_DELIVERY` has been added. If you wish to customize the `ResourceModifiedJsonMessage` sent to the broker, you can do so by implementing this Pointcut, and returning `ResourceModifiedJsonMessage`." diff --git a/hapi-fhir-jpaserver-mdm/src/test/java/ca/uhn/fhir/jpa/mdm/provider/MdmProviderBatchR4Test.java b/hapi-fhir-jpaserver-mdm/src/test/java/ca/uhn/fhir/jpa/mdm/provider/MdmProviderBatchR4Test.java index 5c43e485a7c..7526d745c6c 100644 --- a/hapi-fhir-jpaserver-mdm/src/test/java/ca/uhn/fhir/jpa/mdm/provider/MdmProviderBatchR4Test.java +++ b/hapi-fhir-jpaserver-mdm/src/test/java/ca/uhn/fhir/jpa/mdm/provider/MdmProviderBatchR4Test.java @@ -1,8 +1,10 @@ package ca.uhn.fhir.jpa.mdm.provider; import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.interceptor.api.Hook; import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.interceptor.api.Pointcut; +import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.mdm.log.Logs; import ca.uhn.fhir.mdm.rules.config.MdmSettings; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; @@ -30,6 +32,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.springframework.beans.factory.annotation.Autowired; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -245,16 +248,24 @@ public class MdmProviderBatchR4Test extends BaseLinkR4Test { Patient janePatient = createPatientAndUpdateLinks(buildJanePatient()); Patient janePatient2 = createPatientAndUpdateLinks(buildJanePatient()); assertLinkCount(5); - + final AtomicBoolean mdmSubmitBeforeMessageDeliveryHookCalled = new AtomicBoolean(); + final Object interceptor = new Object() { + @Hook(Pointcut.MDM_SUBMIT_PRE_MESSAGE_DELIVERY) + void hookMethod(ResourceModifiedJsonMessage theResourceModifiedJsonMessage) { + mdmSubmitBeforeMessageDeliveryHookCalled.set(true); + } + }; + myInterceptorService.registerInterceptor(interceptor); // When clearMdmLinks(); afterMdmLatch.runWithExpectedCount(3, () -> { myMdmProvider.mdmBatchPatientType(null , null, theSyncOrAsyncRequest); }); - // Then + assertThat(mdmSubmitBeforeMessageDeliveryHookCalled).isTrue(); updatePatientAndUpdateLinks(janePatient); updatePatientAndUpdateLinks(janePatient2); assertLinkCount(3); + myInterceptorService.unregisterInterceptor(interceptor); } } diff --git a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmChannelSubmitterSvcImpl.java b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmChannelSubmitterSvcImpl.java index 4eece31f188..fcb8c280e14 100644 --- a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmChannelSubmitterSvcImpl.java +++ b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmChannelSubmitterSvcImpl.java @@ -20,6 +20,9 @@ package ca.uhn.fhir.mdm.svc; import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.interceptor.api.HookParams; +import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; +import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; @@ -44,9 +47,12 @@ public class MdmChannelSubmitterSvcImpl implements IMdmChannelSubmitterSvc { private MessageChannel myMdmChannelProducer; - private FhirContext myFhirContext; + private final FhirContext myFhirContext; - private IChannelFactory myChannelFactory; + private final IChannelFactory myChannelFactory; + + @Autowired + private IInterceptorBroadcaster myInterceptorBroadcaster; @Override public void submitResourceToMdmChannel(IBaseResource theResource) { @@ -59,6 +65,11 @@ public class MdmChannelSubmitterSvcImpl implements IMdmChannelSubmitterSvc { (RequestPartitionId) theResource.getUserData(Constants.RESOURCE_PARTITION_ID)); resourceModifiedMessage.setOperationType(ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED); resourceModifiedJsonMessage.setPayload(resourceModifiedMessage); + if (myInterceptorBroadcaster.hasHooks(Pointcut.MDM_SUBMIT_PRE_MESSAGE_DELIVERY)) { + final HookParams params = + new HookParams().add(ResourceModifiedJsonMessage.class, resourceModifiedJsonMessage); + myInterceptorBroadcaster.callHooks(Pointcut.MDM_SUBMIT_PRE_MESSAGE_DELIVERY, params); + } boolean success = getMdmChannelProducer().send(resourceModifiedJsonMessage); if (!success) { ourLog.error("Failed to submit {} to MDM Channel.", resourceModifiedMessage.getPayloadId());