diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_8_0/6496-add-support-for-mdm-channel-settings-override.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_8_0/6496-add-support-for-mdm-channel-settings-override.yaml new file mode 100644 index 00000000000..a1ac4767583 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_8_0/6496-add-support-for-mdm-channel-settings-override.yaml @@ -0,0 +1,4 @@ +--- +type: add +issue: 6496 +title: "Added support for overriding message broker channel settings for MDM message processing." diff --git a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java index eddb42905c5..7bf27ca83ea 100644 --- a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java +++ b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java @@ -55,9 +55,13 @@ public class MdmQueueConsumerLoader { startListeningToMdmChannel(); } + protected ChannelConsumerSettings getChannelConsumerSettings() { + return new ChannelConsumerSettings(); + } + private void startListeningToMdmChannel() { if (myMdmChannel == null) { - ChannelConsumerSettings config = new ChannelConsumerSettings(); + ChannelConsumerSettings config = getChannelConsumerSettings(); config.setConcurrentConsumers(myMdmSettings.getConcurrentConsumers()); diff --git a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/config/MdmConsumerConfig.java b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/config/MdmConsumerConfig.java index 3a80717f6aa..ec4263266af 100644 --- a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/config/MdmConsumerConfig.java +++ b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/config/MdmConsumerConfig.java @@ -92,7 +92,7 @@ public class MdmConsumerConfig { } @Bean - MdmQueueConsumerLoader mdmQueueConsumerLoader( + public MdmQueueConsumerLoader mdmQueueConsumerLoader( IChannelFactory theChannelFactory, IMdmSettings theMdmSettings, MdmMessageHandler theMdmMessageHandler) { return new MdmQueueConsumerLoader(theChannelFactory, theMdmSettings, theMdmMessageHandler); } @@ -139,7 +139,7 @@ public class MdmConsumerConfig { } @Bean - MdmSubscriptionLoader mdmSubscriptionLoader() { + public MdmSubscriptionLoader mdmSubscriptionLoader() { return new MdmSubscriptionLoader(); } diff --git a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/config/MdmSubscriptionLoader.java b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/config/MdmSubscriptionLoader.java index 7d3160c2cee..3f35e0ba5b2 100644 --- a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/config/MdmSubscriptionLoader.java +++ b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/config/MdmSubscriptionLoader.java @@ -131,6 +131,10 @@ public class MdmSubscriptionLoader { mySubscriptionTopicDao.update(theSubscriptionTopic, SystemRequestDetails.forAllPartitions()); } + protected ChannelProducerSettings getChannelProducerSettings() { + return new ChannelProducerSettings(); + } + private org.hl7.fhir.dstu3.model.Subscription buildMdmSubscriptionDstu3(String theId, String theCriteria) { org.hl7.fhir.dstu3.model.Subscription retval = new org.hl7.fhir.dstu3.model.Subscription(); retval.setId(theId); @@ -147,7 +151,7 @@ public class MdmSubscriptionLoader { org.hl7.fhir.dstu3.model.Subscription.SubscriptionChannelComponent channel = retval.getChannel(); channel.setType(org.hl7.fhir.dstu3.model.Subscription.SubscriptionChannelType.MESSAGE); channel.setEndpoint("channel:" - + myChannelNamer.getChannelName(IMdmSettings.EMPI_CHANNEL_NAME, new ChannelProducerSettings())); + + myChannelNamer.getChannelName(IMdmSettings.EMPI_CHANNEL_NAME, getChannelProducerSettings())); channel.setPayload(Constants.CT_JSON); return retval; } @@ -168,7 +172,7 @@ public class MdmSubscriptionLoader { Subscription.SubscriptionChannelComponent channel = retval.getChannel(); channel.setType(Subscription.SubscriptionChannelType.MESSAGE); channel.setEndpoint("channel:" - + myChannelNamer.getChannelName(IMdmSettings.EMPI_CHANNEL_NAME, new ChannelProducerSettings())); + + myChannelNamer.getChannelName(IMdmSettings.EMPI_CHANNEL_NAME, getChannelProducerSettings())); channel.setPayload(Constants.CT_JSON); return retval; } @@ -216,7 +220,7 @@ public class MdmSubscriptionLoader { .setSystem(CanonicalSubscriptionChannelType.MESSAGE.getSystem())); subscription.setEndpoint("channel:" - + myChannelNamer.getChannelName(IMdmSettings.EMPI_CHANNEL_NAME, new ChannelProducerSettings())); + + myChannelNamer.getChannelName(IMdmSettings.EMPI_CHANNEL_NAME, getChannelProducerSettings())); subscription.setContentType(Constants.CT_JSON); return Collections.singletonList(subscription); diff --git a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmChannelSubmitterSvcImpl.java b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmChannelSubmitterSvcImpl.java index fcb8c280e14..72440105f0f 100644 --- a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmChannelSubmitterSvcImpl.java +++ b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmChannelSubmitterSvcImpl.java @@ -82,8 +82,12 @@ public class MdmChannelSubmitterSvcImpl implements IMdmChannelSubmitterSvc { myChannelFactory = theIChannelFactory; } + protected ChannelProducerSettings getChannelProducerSettings() { + return new ChannelProducerSettings(); + } + private void init() { - ChannelProducerSettings channelSettings = new ChannelProducerSettings(); + ChannelProducerSettings channelSettings = getChannelProducerSettings(); myMdmChannelProducer = myChannelFactory.getOrCreateProducer( EMPI_CHANNEL_NAME, ResourceModifiedJsonMessage.class, channelSettings); }