From aa37884b2e2dfd5f2519d70d21dc52fc0c9fb169 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Mon, 25 Jan 2021 09:51:50 -0500 Subject: [PATCH 1/2] Worked on #2317, force single consumer --- .../ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java index 4616e88752c..18f5b873d15 100644 --- a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java +++ b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java @@ -42,8 +42,6 @@ public class MdmQueueConsumerLoader { private MdmMessageHandler myMdmMessageHandler; @Autowired private IChannelFactory myChannelFactory; - @Autowired - private IMdmSettings myMdmSettings; protected IChannelReceiver myMdmChannel; @@ -51,7 +49,10 @@ public class MdmQueueConsumerLoader { public void startListeningToMdmChannel() { if (myMdmChannel == null) { ChannelConsumerSettings config = new ChannelConsumerSettings(); - config.setConcurrentConsumers(myMdmSettings.getConcurrentConsumers()); + + //All MDM must be done single-threaded + config.setConcurrentConsumers(1); + myMdmChannel = myChannelFactory.getOrCreateReceiver(IMdmSettings.EMPI_CHANNEL_NAME, ResourceModifiedJsonMessage.class, config); if (myMdmChannel == null) { ourLog.error("Unable to create receiver for {}", IMdmSettings.EMPI_CHANNEL_NAME); From c572575412d89d815eb9bc2df15598112992e9dc Mon Sep 17 00:00:00 2001 From: Tadgh Date: Mon, 25 Jan 2021 09:55:31 -0500 Subject: [PATCH 2/2] Remove mutator --- .../ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java | 7 ++++--- .../src/main/java/ca/uhn/fhir/mdm/api/IMdmSettings.java | 1 - .../java/ca/uhn/fhir/mdm/rules/config/MdmSettings.java | 5 ----- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java index 18f5b873d15..3c19cf8b973 100644 --- a/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java +++ b/hapi-fhir-jpaserver-mdm/src/main/java/ca/uhn/fhir/jpa/mdm/broker/MdmQueueConsumerLoader.java @@ -42,6 +42,8 @@ public class MdmQueueConsumerLoader { private MdmMessageHandler myMdmMessageHandler; @Autowired private IChannelFactory myChannelFactory; + @Autowired + private IMdmSettings myMdmSettings; protected IChannelReceiver myMdmChannel; @@ -49,9 +51,8 @@ public class MdmQueueConsumerLoader { public void startListeningToMdmChannel() { if (myMdmChannel == null) { ChannelConsumerSettings config = new ChannelConsumerSettings(); - - //All MDM must be done single-threaded - config.setConcurrentConsumers(1); + + config.setConcurrentConsumers(myMdmSettings.getConcurrentConsumers()); myMdmChannel = myChannelFactory.getOrCreateReceiver(IMdmSettings.EMPI_CHANNEL_NAME, ResourceModifiedJsonMessage.class, config); if (myMdmChannel == null) { diff --git a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/api/IMdmSettings.java b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/api/IMdmSettings.java index 025e3a6cfad..44debcda090 100644 --- a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/api/IMdmSettings.java +++ b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/api/IMdmSettings.java @@ -26,7 +26,6 @@ import java.util.stream.Collectors; public interface IMdmSettings { - String MDM_CHANNEL_NAME = "mdm"; String EMPI_CHANNEL_NAME = "empi"; // Parallel processing of MDM can result in missed matches. Best to single-thread. diff --git a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/rules/config/MdmSettings.java b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/rules/config/MdmSettings.java index c81570a6855..0c115238b63 100644 --- a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/rules/config/MdmSettings.java +++ b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/rules/config/MdmSettings.java @@ -67,11 +67,6 @@ public class MdmSettings implements IMdmSettings { return myConcurrentConsumers; } - public MdmSettings setConcurrentConsumers(int theConcurrentConsumers) { - myConcurrentConsumers = theConcurrentConsumers; - return this; - } - public String getScriptText() { return myScriptText; }