6182 introduce a new pointcut to allow customization before sending message to the broker on mdm submit (#6183)
* add a new pointcut for changing message before submitting it to message broker * failing test * fix test by invoking hook when hook is registered * spotless * update docs * changelog * review changes * spotless
This commit is contained in:
parent
44c4b87a84
commit
4368e33fda
|
@ -2515,6 +2515,22 @@ public enum Pointcut implements IPointcut {
|
||||||
MDM_SUBMIT(
|
MDM_SUBMIT(
|
||||||
void.class, "ca.uhn.fhir.rest.api.server.RequestDetails", "ca.uhn.fhir.mdm.model.mdmevents.MdmSubmitEvent"),
|
void.class, "ca.uhn.fhir.rest.api.server.RequestDetails", "ca.uhn.fhir.mdm.model.mdmevents.MdmSubmitEvent"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <b>MDM_SUBMIT_PRE_MESSAGE_DELIVERY Hook:</b>
|
||||||
|
* Invoked immediately before the delivery of a MESSAGE to the broker.
|
||||||
|
* <p>
|
||||||
|
* Hooks can make changes to the delivery payload.
|
||||||
|
* Furthermore, modification can be made to the outgoing message,
|
||||||
|
* for example adding headers or changing message key,
|
||||||
|
* which will be used for the subsequent processing.
|
||||||
|
* </p>
|
||||||
|
* Hooks should accept the following parameters:
|
||||||
|
* <ul>
|
||||||
|
* <li>ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
MDM_SUBMIT_PRE_MESSAGE_DELIVERY(void.class, "ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage"),
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <b>JPA Hook:</b>
|
* <b>JPA Hook:</b>
|
||||||
* This hook is invoked when a cross-partition reference is about to be
|
* This hook is invoked when a cross-partition reference is about to be
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
---
|
||||||
|
type: add
|
||||||
|
issue: 6182
|
||||||
|
title: "A new Pointcut called `MDM_SUBMIT_PRE_MESSAGE_DELIVERY` has been added. If you wish to customize the `ResourceModifiedJsonMessage` sent to the broker, you can do so by implementing this Pointcut, and returning `ResourceModifiedJsonMessage`."
|
|
@ -1,8 +1,10 @@
|
||||||
package ca.uhn.fhir.jpa.mdm.provider;
|
package ca.uhn.fhir.jpa.mdm.provider;
|
||||||
|
|
||||||
import ca.uhn.fhir.i18n.Msg;
|
import ca.uhn.fhir.i18n.Msg;
|
||||||
|
import ca.uhn.fhir.interceptor.api.Hook;
|
||||||
import ca.uhn.fhir.interceptor.api.IInterceptorService;
|
import ca.uhn.fhir.interceptor.api.IInterceptorService;
|
||||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
|
||||||
import ca.uhn.fhir.mdm.log.Logs;
|
import ca.uhn.fhir.mdm.log.Logs;
|
||||||
import ca.uhn.fhir.mdm.rules.config.MdmSettings;
|
import ca.uhn.fhir.mdm.rules.config.MdmSettings;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||||
|
@ -30,6 +32,7 @@ import org.junit.jupiter.params.provider.MethodSource;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
@ -245,16 +248,24 @@ public class MdmProviderBatchR4Test extends BaseLinkR4Test {
|
||||||
Patient janePatient = createPatientAndUpdateLinks(buildJanePatient());
|
Patient janePatient = createPatientAndUpdateLinks(buildJanePatient());
|
||||||
Patient janePatient2 = createPatientAndUpdateLinks(buildJanePatient());
|
Patient janePatient2 = createPatientAndUpdateLinks(buildJanePatient());
|
||||||
assertLinkCount(5);
|
assertLinkCount(5);
|
||||||
|
final AtomicBoolean mdmSubmitBeforeMessageDeliveryHookCalled = new AtomicBoolean();
|
||||||
|
final Object interceptor = new Object() {
|
||||||
|
@Hook(Pointcut.MDM_SUBMIT_PRE_MESSAGE_DELIVERY)
|
||||||
|
void hookMethod(ResourceModifiedJsonMessage theResourceModifiedJsonMessage) {
|
||||||
|
mdmSubmitBeforeMessageDeliveryHookCalled.set(true);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
myInterceptorService.registerInterceptor(interceptor);
|
||||||
// When
|
// When
|
||||||
clearMdmLinks();
|
clearMdmLinks();
|
||||||
afterMdmLatch.runWithExpectedCount(3, () -> {
|
afterMdmLatch.runWithExpectedCount(3, () -> {
|
||||||
myMdmProvider.mdmBatchPatientType(null , null, theSyncOrAsyncRequest);
|
myMdmProvider.mdmBatchPatientType(null , null, theSyncOrAsyncRequest);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Then
|
// Then
|
||||||
|
assertThat(mdmSubmitBeforeMessageDeliveryHookCalled).isTrue();
|
||||||
updatePatientAndUpdateLinks(janePatient);
|
updatePatientAndUpdateLinks(janePatient);
|
||||||
updatePatientAndUpdateLinks(janePatient2);
|
updatePatientAndUpdateLinks(janePatient2);
|
||||||
assertLinkCount(3);
|
assertLinkCount(3);
|
||||||
|
myInterceptorService.unregisterInterceptor(interceptor);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,9 @@
|
||||||
package ca.uhn.fhir.mdm.svc;
|
package ca.uhn.fhir.mdm.svc;
|
||||||
|
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
import ca.uhn.fhir.interceptor.api.HookParams;
|
||||||
|
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
|
||||||
|
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||||
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
|
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
|
||||||
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
|
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
|
||||||
|
@ -44,9 +47,12 @@ public class MdmChannelSubmitterSvcImpl implements IMdmChannelSubmitterSvc {
|
||||||
|
|
||||||
private MessageChannel myMdmChannelProducer;
|
private MessageChannel myMdmChannelProducer;
|
||||||
|
|
||||||
private FhirContext myFhirContext;
|
private final FhirContext myFhirContext;
|
||||||
|
|
||||||
private IChannelFactory myChannelFactory;
|
private final IChannelFactory myChannelFactory;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IInterceptorBroadcaster myInterceptorBroadcaster;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void submitResourceToMdmChannel(IBaseResource theResource) {
|
public void submitResourceToMdmChannel(IBaseResource theResource) {
|
||||||
|
@ -59,6 +65,11 @@ public class MdmChannelSubmitterSvcImpl implements IMdmChannelSubmitterSvc {
|
||||||
(RequestPartitionId) theResource.getUserData(Constants.RESOURCE_PARTITION_ID));
|
(RequestPartitionId) theResource.getUserData(Constants.RESOURCE_PARTITION_ID));
|
||||||
resourceModifiedMessage.setOperationType(ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED);
|
resourceModifiedMessage.setOperationType(ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED);
|
||||||
resourceModifiedJsonMessage.setPayload(resourceModifiedMessage);
|
resourceModifiedJsonMessage.setPayload(resourceModifiedMessage);
|
||||||
|
if (myInterceptorBroadcaster.hasHooks(Pointcut.MDM_SUBMIT_PRE_MESSAGE_DELIVERY)) {
|
||||||
|
final HookParams params =
|
||||||
|
new HookParams().add(ResourceModifiedJsonMessage.class, resourceModifiedJsonMessage);
|
||||||
|
myInterceptorBroadcaster.callHooks(Pointcut.MDM_SUBMIT_PRE_MESSAGE_DELIVERY, params);
|
||||||
|
}
|
||||||
boolean success = getMdmChannelProducer().send(resourceModifiedJsonMessage);
|
boolean success = getMdmChannelProducer().send(resourceModifiedJsonMessage);
|
||||||
if (!success) {
|
if (!success) {
|
||||||
ourLog.error("Failed to submit {} to MDM Channel.", resourceModifiedMessage.getPayloadId());
|
ourLog.error("Failed to submit {} to MDM Channel.", resourceModifiedMessage.getPayloadId());
|
||||||
|
|
Loading…
Reference in New Issue