From 59bf8b836fb08f9badb61c8f6b933656ee8a07fa Mon Sep 17 00:00:00 2001 From: TipzCM Date: Wed, 3 Nov 2021 18:21:10 -0400 Subject: [PATCH] Issue 3120 support adding retry extension (#3137) * issue-1134 initial work on adding retry handler * issue-1134 subdscription creation fixes * issue-1134 update the test * issue-1134 retry policy off canonicalsubscription * issue 1134 base subscription changes * add failing test * issue 1134 retry configs on base parameters * issue 1134 passing forward params * issue-1134 added more tests for making sure subscriptions create their channels correctly * issue-1134 updates to get the retry to channels * issue-1134 updating channel factory * issue-1134 remove the dlq since it's not defineable * issue-3120 changelog added * issue-3120 cleaning up * issue-3120 test fix * issue-3120 review fixes * issue-3120 fixed bad master merge * issue-3120 updates for new release Co-authored-by: leif stawnyczy Co-authored-by: Ken Stevens --- hapi-deployable-pom/pom.xml | 2 +- hapi-fhir-android/pom.xml | 2 +- hapi-fhir-base/pom.xml | 2 +- .../java/ca/uhn/fhir/util/HapiExtensions.java | 8 + hapi-fhir-batch/pom.xml | 2 +- hapi-fhir-bom/pom.xml | 4 +- hapi-fhir-cli/hapi-fhir-cli-api/pom.xml | 2 +- hapi-fhir-cli/hapi-fhir-cli-app/pom.xml | 2 +- hapi-fhir-cli/hapi-fhir-cli-jpaserver/pom.xml | 2 +- hapi-fhir-cli/pom.xml | 2 +- hapi-fhir-client-okhttp/pom.xml | 2 +- hapi-fhir-client/pom.xml | 2 +- hapi-fhir-converter/pom.xml | 2 +- hapi-fhir-dist/pom.xml | 2 +- hapi-fhir-docs/pom.xml | 2 +- ...3120-add-retry-subscription-extension.yaml | 5 + hapi-fhir-jacoco/pom.xml | 2 +- hapi-fhir-jaxrsserver-base/pom.xml | 2 +- hapi-fhir-jpa/pom.xml | 2 +- hapi-fhir-jpaserver-base/pom.xml | 2 +- hapi-fhir-jpaserver-cql/pom.xml | 2 +- hapi-fhir-jpaserver-mdm/pom.xml | 2 +- hapi-fhir-jpaserver-model/pom.xml | 2 +- hapi-fhir-jpaserver-searchparam/pom.xml | 2 +- hapi-fhir-jpaserver-subscription/pom.xml | 2 +- .../channel/models/BaseChannelParameters.java | 29 +++ .../models/ProducingChannelParameters.java | 16 ++ .../models/ReceivingChannelParameters.java | 15 ++ .../SubscriptionChannelRegistry.java | 51 ++++- .../BaseSubscriptionDeliverySubscriber.java | 4 +- .../SubscriptionMatchingSubscriber.java | 1 - .../match/registry/ActiveSubscription.java | 12 ++ .../match/registry/SubscriptionRegistry.java | 50 ++++- .../SubscriptionChannelRegistryTest.java | 107 +++++++++++ .../registry/SubscriptionRegistryTest.java | 180 ++++++++++++++++++ .../module/BaseSubscriptionTest.java | 10 + .../cache/BaseSubscriptionRegistryTest.java | 14 ++ .../cache/SubscriptionRegistryTest.java | 5 +- hapi-fhir-jpaserver-test-utilities/pom.xml | 2 +- hapi-fhir-jpaserver-uhnfhirtest/pom.xml | 2 +- .../ca/uhn/fhirtest/config/CommonConfig.java | 1 - hapi-fhir-server-mdm/pom.xml | 2 +- hapi-fhir-server-openapi/pom.xml | 2 +- hapi-fhir-server/pom.xml | 2 +- .../pom.xml | 2 +- .../pom.xml | 2 +- .../pom.xml | 2 +- .../pom.xml | 2 +- .../hapi-fhir-spring-boot-samples/pom.xml | 2 +- .../hapi-fhir-spring-boot-starter/pom.xml | 2 +- hapi-fhir-spring-boot/pom.xml | 2 +- hapi-fhir-sql-migrate/pom.xml | 2 +- hapi-fhir-storage/pom.xml | 2 +- .../channel/api/BaseChannelSettings.java | 13 ++ .../impl/LinkedBlockingChannelFactory.java | 7 +- ...roadcastingSubscribableChannelWrapper.java | 1 - .../SubscriptionChannelFactory.java | 11 ++ .../registry/SubscriptionCanonicalizer.java | 5 +- .../model/ChannelRetryConfiguration.java | 16 ++ hapi-fhir-structures-dstu2.1/pom.xml | 2 +- hapi-fhir-structures-dstu2/pom.xml | 2 +- hapi-fhir-structures-dstu3/pom.xml | 2 +- hapi-fhir-structures-hl7org-dstu2/pom.xml | 2 +- hapi-fhir-structures-r4/pom.xml | 2 +- hapi-fhir-structures-r5/pom.xml | 2 +- hapi-fhir-test-utilities/pom.xml | 2 +- .../HashMapResourceProviderExtension.java | 14 +- hapi-fhir-testpage-overlay/pom.xml | 2 +- .../pom.xml | 2 +- hapi-fhir-validation-resources-dstu2/pom.xml | 2 +- hapi-fhir-validation-resources-dstu3/pom.xml | 2 +- hapi-fhir-validation-resources-r4/pom.xml | 2 +- hapi-fhir-validation-resources-r5/pom.xml | 2 +- hapi-fhir-validation/pom.xml | 2 +- hapi-tinder-plugin/pom.xml | 16 +- hapi-tinder-test/pom.xml | 2 +- pom.xml | 2 +- .../pom.xml | 2 +- .../pom.xml | 2 +- .../pom.xml | 2 +- 80 files changed, 608 insertions(+), 97 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_7_0/3120-add-retry-subscription-extension.yaml create mode 100644 hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/models/BaseChannelParameters.java create mode 100644 hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/models/ProducingChannelParameters.java create mode 100644 hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/models/ReceivingChannelParameters.java create mode 100644 hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistryTest.java create mode 100644 hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionRegistryTest.java create mode 100644 hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ChannelRetryConfiguration.java diff --git a/hapi-deployable-pom/pom.xml b/hapi-deployable-pom/pom.xml index e0ec776f46a..c71f2288fb4 100644 --- a/hapi-deployable-pom/pom.xml +++ b/hapi-deployable-pom/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-fhir - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../pom.xml diff --git a/hapi-fhir-android/pom.xml b/hapi-fhir-android/pom.xml index b4265c53ed9..fb56a5c1121 100644 --- a/hapi-fhir-android/pom.xml +++ b/hapi-fhir-android/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-base/pom.xml b/hapi-fhir-base/pom.xml index 3db9367f1eb..c4a4c5398bb 100644 --- a/hapi-fhir-base/pom.xml +++ b/hapi-fhir-base/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/HapiExtensions.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/HapiExtensions.java index 6dfdca94d4b..188f5af8258 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/HapiExtensions.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/HapiExtensions.java @@ -122,6 +122,14 @@ public class HapiExtensions { */ public static final String EXT_OP_PARAMETER_EXAMPLE_VALUE = "http://hapifhir.io/fhir/StructureDefinition/op-parameter-example-value"; + /** + * This extension provides a way for subscribers to provide + * a "retry-count". + * If provided, subscriptions will be retried this many times + * (to a total of retry-count + 1 (for original attempt) + */ + public static final String EX_RETRY_COUNT = "http://hapifhir.io/fhir/StructureDefinition/subscription-delivery-retry-count"; + /** * Non instantiable */ diff --git a/hapi-fhir-batch/pom.xml b/hapi-fhir-batch/pom.xml index efc0a85dbf9..36f6dbe1b98 100644 --- a/hapi-fhir-batch/pom.xml +++ b/hapi-fhir-batch/pom.xml @@ -6,7 +6,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-bom/pom.xml b/hapi-fhir-bom/pom.xml index 36e4d6505bb..fa69ca37bfd 100644 --- a/hapi-fhir-bom/pom.xml +++ b/hapi-fhir-bom/pom.xml @@ -3,14 +3,14 @@ 4.0.0 ca.uhn.hapi.fhir hapi-fhir-bom - 5.7.0-PRE1-SNAPSHOT + 5.6.0-PRE9-SNAPSHOT pom HAPI FHIR BOM ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-cli/hapi-fhir-cli-api/pom.xml b/hapi-fhir-cli/hapi-fhir-cli-api/pom.xml index 2d4570f358e..c94e446a1a6 100644 --- a/hapi-fhir-cli/hapi-fhir-cli-api/pom.xml +++ b/hapi-fhir-cli/hapi-fhir-cli-api/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-cli/hapi-fhir-cli-app/pom.xml b/hapi-fhir-cli/hapi-fhir-cli-app/pom.xml index 4851589d135..98987908202 100644 --- a/hapi-fhir-cli/hapi-fhir-cli-app/pom.xml +++ b/hapi-fhir-cli/hapi-fhir-cli-app/pom.xml @@ -6,7 +6,7 @@ ca.uhn.hapi.fhir hapi-fhir-cli - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../pom.xml diff --git a/hapi-fhir-cli/hapi-fhir-cli-jpaserver/pom.xml b/hapi-fhir-cli/hapi-fhir-cli-jpaserver/pom.xml index 1d6aa0062fa..b135181d173 100644 --- a/hapi-fhir-cli/hapi-fhir-cli-jpaserver/pom.xml +++ b/hapi-fhir-cli/hapi-fhir-cli-jpaserver/pom.xml @@ -6,7 +6,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../../hapi-deployable-pom diff --git a/hapi-fhir-cli/pom.xml b/hapi-fhir-cli/pom.xml index a5dff2fcddd..f34353b7737 100644 --- a/hapi-fhir-cli/pom.xml +++ b/hapi-fhir-cli/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-fhir - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../pom.xml diff --git a/hapi-fhir-client-okhttp/pom.xml b/hapi-fhir-client-okhttp/pom.xml index 2c28b23d52e..c487da14a34 100644 --- a/hapi-fhir-client-okhttp/pom.xml +++ b/hapi-fhir-client-okhttp/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-client/pom.xml b/hapi-fhir-client/pom.xml index d5af038afd8..e7fae45b765 100644 --- a/hapi-fhir-client/pom.xml +++ b/hapi-fhir-client/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-converter/pom.xml b/hapi-fhir-converter/pom.xml index ceb62e7fbe0..526ac422885 100644 --- a/hapi-fhir-converter/pom.xml +++ b/hapi-fhir-converter/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-dist/pom.xml b/hapi-fhir-dist/pom.xml index c25b43bfe8e..e255b561986 100644 --- a/hapi-fhir-dist/pom.xml +++ b/hapi-fhir-dist/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-fhir - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../pom.xml diff --git a/hapi-fhir-docs/pom.xml b/hapi-fhir-docs/pom.xml index 65f8e602547..c33946d0dfa 100644 --- a/hapi-fhir-docs/pom.xml +++ b/hapi-fhir-docs/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_7_0/3120-add-retry-subscription-extension.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_7_0/3120-add-retry-subscription-extension.yaml new file mode 100644 index 00000000000..2b5d5e01b1b --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_7_0/3120-add-retry-subscription-extension.yaml @@ -0,0 +1,5 @@ +--- +type: add +issue: 3120 +title: "Added http://hapifhir.io/fhir/StructureDefinition/subscription-delivery-retry-count extension that can be provided +to a subscription to define a specific retry strategy (retry retry-count number of times before giving up)." diff --git a/hapi-fhir-jacoco/pom.xml b/hapi-fhir-jacoco/pom.xml index 4cf50ace985..84c333ecc8d 100644 --- a/hapi-fhir-jacoco/pom.xml +++ b/hapi-fhir-jacoco/pom.xml @@ -11,7 +11,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-jaxrsserver-base/pom.xml b/hapi-fhir-jaxrsserver-base/pom.xml index 682eff68d50..75713c7107d 100644 --- a/hapi-fhir-jaxrsserver-base/pom.xml +++ b/hapi-fhir-jaxrsserver-base/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-jpa/pom.xml b/hapi-fhir-jpa/pom.xml index f3f857dc535..e9748c6b928 100644 --- a/hapi-fhir-jpa/pom.xml +++ b/hapi-fhir-jpa/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml 4.0.0 diff --git a/hapi-fhir-jpaserver-base/pom.xml b/hapi-fhir-jpaserver-base/pom.xml index d55ef3cb494..169ac3fadfb 100644 --- a/hapi-fhir-jpaserver-base/pom.xml +++ b/hapi-fhir-jpaserver-base/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-jpaserver-cql/pom.xml b/hapi-fhir-jpaserver-cql/pom.xml index 6bf9fd07a7b..acc9836c90b 100644 --- a/hapi-fhir-jpaserver-cql/pom.xml +++ b/hapi-fhir-jpaserver-cql/pom.xml @@ -7,7 +7,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-jpaserver-mdm/pom.xml b/hapi-fhir-jpaserver-mdm/pom.xml index 40a7d8209c2..e1f84effd79 100644 --- a/hapi-fhir-jpaserver-mdm/pom.xml +++ b/hapi-fhir-jpaserver-mdm/pom.xml @@ -6,7 +6,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-jpaserver-model/pom.xml b/hapi-fhir-jpaserver-model/pom.xml index 0ec27929518..3b9faf72a76 100644 --- a/hapi-fhir-jpaserver-model/pom.xml +++ b/hapi-fhir-jpaserver-model/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-jpaserver-searchparam/pom.xml b/hapi-fhir-jpaserver-searchparam/pom.xml index eda296ebacb..fffe883c5e6 100755 --- a/hapi-fhir-jpaserver-searchparam/pom.xml +++ b/hapi-fhir-jpaserver-searchparam/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-jpaserver-subscription/pom.xml b/hapi-fhir-jpaserver-subscription/pom.xml index 186bf50b351..a3a14ba25b6 100644 --- a/hapi-fhir-jpaserver-subscription/pom.xml +++ b/hapi-fhir-jpaserver-subscription/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/models/BaseChannelParameters.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/models/BaseChannelParameters.java new file mode 100644 index 00000000000..3e7aea37e6b --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/models/BaseChannelParameters.java @@ -0,0 +1,29 @@ +package ca.uhn.fhir.jpa.subscription.channel.models; + +import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration; + +public class BaseChannelParameters { + + private final String myChannelName; + + private ChannelRetryConfiguration myRetryConfiguration; + + /** + * Constructor + */ + public BaseChannelParameters(String theChannelName) { + myChannelName = theChannelName; + } + + public String getChannelName() { + return myChannelName; + } + + public void setRetryConfiguration(ChannelRetryConfiguration theConfiguration) { + myRetryConfiguration = theConfiguration; + } + + public ChannelRetryConfiguration getRetryConfiguration() { + return myRetryConfiguration; + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/models/ProducingChannelParameters.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/models/ProducingChannelParameters.java new file mode 100644 index 00000000000..b0eee4700bf --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/models/ProducingChannelParameters.java @@ -0,0 +1,16 @@ +package ca.uhn.fhir.jpa.subscription.channel.models; + +public class ProducingChannelParameters extends BaseChannelParameters { + + /** + * Constructor + * + * Producing channels are sending channels. They send data to topics/queues. + * + * @param theChannelName + */ + public ProducingChannelParameters(String theChannelName) { + super(theChannelName); + } + +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/models/ReceivingChannelParameters.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/models/ReceivingChannelParameters.java new file mode 100644 index 00000000000..38efe944820 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/models/ReceivingChannelParameters.java @@ -0,0 +1,15 @@ +package ca.uhn.fhir.jpa.subscription.channel.models; + +public class ReceivingChannelParameters extends BaseChannelParameters { + + /** + * Constructor + * + * Receiving channels are channels that receive data from topics/queues + * + * @param theChannelName + */ + public ReceivingChannelParameters(String theChannelName) { + super(theChannelName); + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java index d8738669cef..77ee2a70701 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java @@ -20,10 +20,15 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription; * #L% */ +import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; +import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; +import ca.uhn.fhir.jpa.subscription.channel.models.ProducingChannelParameters; +import ca.uhn.fhir.jpa.subscription.channel.models.ReceivingChannelParameters; import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; +import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration; import com.google.common.collect.Multimap; import com.google.common.collect.MultimapBuilder; import org.slf4j.Logger; @@ -60,29 +65,63 @@ public class SubscriptionChannelRegistry { return; } - IChannelReceiver channelReceiver = newReceivingChannel(channelName); + // we get the retry configurations from the cannonicalized subscriber + // these will be provided to both the producer and receiver channel + ChannelRetryConfiguration retryConfigParameters = theActiveSubscription.getRetryConfigurationParameters(); + + /* + * When we create a subscription, we create both + * a producing/sending channel and + * a receiving channel. + * + * Matched subscriptions are sent to the Sending channel + * and the sending channel sends to subscription matching service. + * + * Receiving channel will send it out to + * the subscriber hook (REST, email, etc). + */ + + // the receiving channel + // this sends to the hook (resthook/message/email/whatever) + ReceivingChannelParameters receivingParameters = new ReceivingChannelParameters(channelName); + receivingParameters.setRetryConfiguration(retryConfigParameters); + + IChannelReceiver channelReceiver = newReceivingChannel(receivingParameters); Optional deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType()); SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, channelReceiver); deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler); myDeliveryReceiverChannels.put(channelName, subscriptionChannelWithHandlers); - IChannelProducer sendingChannel = newSendingChannel(channelName); + // create the producing channel. + // channel used for sending to subscription matcher + ProducingChannelParameters producingChannelParameters = new ProducingChannelParameters(channelName); + producingChannelParameters.setRetryConfiguration(retryConfigParameters); + + IChannelProducer sendingChannel = newSendingChannel(producingChannelParameters); myChannelNameToSender.put(channelName, sendingChannel); } - protected IChannelReceiver newReceivingChannel(String theChannelName) { - return mySubscriptionDeliveryChannelFactory.newDeliveryReceivingChannel(theChannelName, null); + protected IChannelReceiver newReceivingChannel(ReceivingChannelParameters theParameters) { + ChannelConsumerSettings settings = new ChannelConsumerSettings(); + settings.setRetryConfiguration(theParameters.getRetryConfiguration()); + return mySubscriptionDeliveryChannelFactory.newDeliveryReceivingChannel(theParameters.getChannelName(), + settings); } - protected IChannelProducer newSendingChannel(String theChannelName) { - return mySubscriptionDeliveryChannelFactory.newDeliverySendingChannel(theChannelName, null); + protected IChannelProducer newSendingChannel(ProducingChannelParameters theParameters) { + ChannelProducerSettings settings = new ChannelProducerSettings(); + settings.setRetryConfiguration(theParameters.getRetryConfiguration()); + return mySubscriptionDeliveryChannelFactory.newDeliverySendingChannel(theParameters.getChannelName(), + settings); } public synchronized void remove(ActiveSubscription theActiveSubscription) { String channelName = theActiveSubscription.getChannelName(); ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId(), channelName); boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId()); + ChannelRetryConfiguration retryConfig = theActiveSubscription.getRetryConfigurationParameters(); + if (!removed) { ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId(), channelName); } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriber.java index 52d378129ab..27c1b541987 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/BaseSubscriptionDeliverySubscriber.java @@ -24,10 +24,10 @@ import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.Pointcut; -import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; -import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; +import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; +import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchingSubscriber.java index 2020d582798..2301d536071 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchingSubscriber.java @@ -73,7 +73,6 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { super(); } - @Override public void handleMessage(@Nonnull Message theMessage) throws MessagingException { ourLog.trace("Handling resource modified message: {}", theMessage); diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/ActiveSubscription.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/ActiveSubscription.java index 8071a01366a..f9f71d6c899 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/ActiveSubscription.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/ActiveSubscription.java @@ -23,15 +23,19 @@ package ca.uhn.fhir.jpa.subscription.match.registry; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; +import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration; public class ActiveSubscription { private SubscriptionCriteriaParser.SubscriptionCriteria myCriteria; + private final String myChannelName; private final String myId; private CanonicalSubscription mySubscription; private boolean flagForDeletion; + private ChannelRetryConfiguration myRetryConfigurationParameters; + public ActiveSubscription(CanonicalSubscription theSubscription, String theChannelName) { myChannelName = theChannelName; myId = theSubscription.getIdPart(); @@ -70,4 +74,12 @@ public class ActiveSubscription { public CanonicalSubscriptionChannelType getChannelType() { return mySubscription.getChannelType(); } + + public void setRetryConfiguration(ChannelRetryConfiguration theParams) { + myRetryConfigurationParameters = theParams; + } + + public ChannelRetryConfiguration getRetryConfigurationParameters() { + return myRetryConfigurationParameters; + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionRegistry.java index b1908efe866..4ffba792bcb 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionRegistry.java @@ -26,6 +26,8 @@ import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; +import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration; +import ca.uhn.fhir.util.HapiExtensions; import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; @@ -80,17 +82,48 @@ public class SubscriptionRegistry { return activeSubscription.map(ActiveSubscription::getSubscription); } - private void registerSubscription(IIdType theId, IBaseResource theSubscription) { + /** + * Extracts the retry configuration settings from the CanonicalSubscription object. + * + * Returns the configuration, or null, if no retry (or a bad retry value) + * is specified. + * + * @param theSubscription + * @return + */ + private ChannelRetryConfiguration getRetryConfigurationFromSubscriptionExtensions(CanonicalSubscription theSubscription) { + ChannelRetryConfiguration configuration = new ChannelRetryConfiguration(); + + List retryCount = theSubscription.getChannelExtensions(HapiExtensions.EX_RETRY_COUNT); + if (retryCount.size() == 1) { + String val = retryCount.get(0); + configuration.setRetryCount(Integer.parseInt(val)); + } + // else - 0 or more than 1 means no retry policy at all + + // retry count is required for any retry policy + if (configuration.getRetryCount() == null || configuration.getRetryCount() < 0) { + configuration = null; + } + + return configuration; + } + + private void registerSubscription(IIdType theId, CanonicalSubscription theCanonicalSubscription) { Validate.notNull(theId); String subscriptionId = theId.getIdPart(); Validate.notBlank(subscriptionId); - Validate.notNull(theSubscription); + Validate.notNull(theCanonicalSubscription); - CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription); + String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(theCanonicalSubscription); - String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(canonicalized); + // get the actual retry configuration + ChannelRetryConfiguration configuration = getRetryConfigurationFromSubscriptionExtensions(theCanonicalSubscription); - ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, channelName); + ActiveSubscription activeSubscription = new ActiveSubscription(theCanonicalSubscription, channelName); + activeSubscription.setRetryConfiguration(configuration); + + // add to our registries mySubscriptionChannelRegistry.add(activeSubscription); myActiveSubscriptionCache.put(subscriptionId, activeSubscription); @@ -98,9 +131,8 @@ public class SubscriptionRegistry { // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED HookParams params = new HookParams() - .add(CanonicalSubscription.class, canonicalized); + .add(CanonicalSubscription.class, theCanonicalSubscription); myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params); - } public void unregisterSubscriptionIfRegistered(String theSubscriptionId) { @@ -114,7 +146,6 @@ public class SubscriptionRegistry { // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED HookParams params = new HookParams(); myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED, params); - } } @@ -135,6 +166,7 @@ public class SubscriptionRegistry { } public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) { + Validate.notNull(theSubscription); Optional existingSubscription = hasSubscription(theSubscription.getIdElement()); CanonicalSubscription newSubscription = mySubscriptionCanonicalizer.canonicalize(theSubscription); @@ -152,7 +184,7 @@ public class SubscriptionRegistry { unregisterSubscriptionIfRegistered(theSubscription.getIdElement().getIdPart()); } if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) { - registerSubscription(theSubscription.getIdElement(), theSubscription); + registerSubscription(theSubscription.getIdElement(), newSubscription); return true; } else { return false; diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistryTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistryTest.java new file mode 100644 index 00000000000..eea1b817077 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistryTest.java @@ -0,0 +1,107 @@ +package ca.uhn.fhir.jpa.subscription.channel.subscription; + +import ca.uhn.fhir.jpa.subscription.channel.api.BaseChannelSettings; +import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; +import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; +import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; +import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; +import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; +import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; +import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; +import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +import java.util.Optional; + +@ExtendWith(MockitoExtension.class) +public class SubscriptionChannelRegistryTest { + + @Mock + private SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory; + + @Mock + private SubscriptionChannelFactory mySubscriptionChannelFactory; + + @InjectMocks + private SubscriptionChannelRegistry mySubscriptionChannelRegistry; + + private ActiveSubscription createActiveSubscription(String theChannelName, int theRetryCount) { + CanonicalSubscription subscription = new CanonicalSubscription(); + subscription.setChannelType(CanonicalSubscriptionChannelType.RESTHOOK); + ChannelRetryConfiguration configuration = new ChannelRetryConfiguration(); + configuration.setRetryCount(theRetryCount); + ActiveSubscription activeSubscription = new ActiveSubscription(subscription, theChannelName); + activeSubscription.setRetryConfiguration(configuration); + return activeSubscription; + } + + @Test + public void add_subscriptionWithRetryConfigs_createsSendingAndReceivingChannelsWithRetryConfigs() { + int retryCount = 5; + String channelName = "test"; + ActiveSubscription activeSubscription = createActiveSubscription(channelName, retryCount); + + // mocks + MessageHandler messageHandler = Mockito.mock(MessageHandler.class); + IChannelReceiver receiver = Mockito.mock(IChannelReceiver.class); + IChannelProducer producer = Mockito.mock(IChannelProducer.class); + + // when + Mockito.when(mySubscriptionChannelFactory.newDeliveryReceivingChannel( + Mockito.anyString(), + Mockito.any(ChannelConsumerSettings.class) + )).thenReturn(receiver); + Mockito.when(mySubscriptionChannelFactory.newDeliverySendingChannel( + Mockito.anyString(), + Mockito.any(ChannelProducerSettings.class) + )).thenReturn(producer); + Mockito.when(mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(Mockito.any(CanonicalSubscriptionChannelType.class))) + .thenReturn(Optional.of(messageHandler)); + + // test + mySubscriptionChannelRegistry.add(activeSubscription); + + // verify + // the receiver and sender should've been added to the maps + SubscriptionChannelWithHandlers receiverChannel = mySubscriptionChannelRegistry.getDeliveryReceiverChannel(channelName); + MessageChannel senderChannel = mySubscriptionChannelRegistry.getDeliverySenderChannel(channelName); + + Assertions.assertEquals(producer, senderChannel); + Assertions.assertEquals(receiver, receiverChannel.getChannel()); + + // verify the creation of the sender/receiver + // both have retry values provided + ArgumentCaptor consumerCaptor = ArgumentCaptor.forClass(ChannelConsumerSettings.class); + Mockito.verify(mySubscriptionChannelFactory) + .newDeliveryReceivingChannel(Mockito.anyString(), + consumerCaptor.capture()); + ChannelConsumerSettings consumerSettings = consumerCaptor.getValue(); + verifySettingsHaveRetryConfig(consumerSettings, retryCount); + + ArgumentCaptor producerCaptor = ArgumentCaptor.forClass(ChannelProducerSettings.class); + Mockito.verify(mySubscriptionChannelFactory) + .newDeliverySendingChannel(Mockito.anyString(), + producerCaptor.capture()); + verifySettingsHaveRetryConfig(producerCaptor.getValue(), retryCount); + } + + /** + * Verifies the retry configs for the channel + * @param theSettings + * @param theRetryCount + */ + private void verifySettingsHaveRetryConfig(BaseChannelSettings theSettings, int theRetryCount) { + Assertions.assertNotNull(theSettings); + Assertions.assertNotNull(theSettings.getRetryConfigurationParameters()); + Assertions.assertEquals(theRetryCount, theSettings.getRetryConfigurationParameters().getRetryCount()); + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionRegistryTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionRegistryTest.java new file mode 100644 index 00000000000..af9fe41c27e --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionRegistryTest.java @@ -0,0 +1,180 @@ +package ca.uhn.fhir.jpa.subscription.match.registry; + +import ca.uhn.fhir.interceptor.api.HookParams; +import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; +import ca.uhn.fhir.interceptor.api.Pointcut; +import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer; +import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; +import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; +import ca.uhn.fhir.util.HapiExtensions; +import org.hl7.fhir.r4.model.Extension; +import org.hl7.fhir.r4.model.IntegerType; +import org.hl7.fhir.r4.model.Subscription; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +@ExtendWith(MockitoExtension.class) +public class SubscriptionRegistryTest { + + @Mock + private SubscriptionCanonicalizer mySubscriptionCanonicalizer; + + @Mock + private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer; + + @Mock + private SubscriptionChannelRegistry mySubscriptionChannelRegistry; + + @Mock + private IInterceptorBroadcaster myInterceptorBroadcaster; + + @InjectMocks + private SubscriptionRegistry mySubscriptionRegistry; + + private Subscription createSubscription(Extension... theExtensions) { + Subscription subscription = new Subscription(); + subscription.setId("123"); + subscription.setCriteria("Patient"); + subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE); + Subscription.SubscriptionChannelComponent channel + = new Subscription.SubscriptionChannelComponent(); + channel.setType(Subscription.SubscriptionChannelType.RESTHOOK); + channel.setPayload("application/json"); + channel.setEndpoint("http://unused.test.endpoint/"); + subscription.setChannel(channel); + + if (theExtensions != null) { + for (Extension ex : theExtensions) { + channel.addExtension(ex); + } + } + return subscription; + } + + private CanonicalSubscription getCanonicalSubscriptionFromSubscription(Subscription theSubscription) { + CanonicalSubscription subscription = new CanonicalSubscription(); + subscription.setStatus(theSubscription.getStatus()); + subscription.setCriteriaString(theSubscription.getCriteria()); + + Subscription.SubscriptionChannelComponent channel = theSubscription.getChannel(); + HashMap> extensions = new HashMap>(); + + for (Extension ex : channel.getExtension()) { + if (!extensions.containsKey(ex.getUrl())) { + extensions.put(ex.getUrl(), new ArrayList<>()); + } + extensions.get(ex.getUrl()).add(ex.getValueAsPrimitive().getValueAsString()); + } + subscription.setChannelExtensions(extensions); + + return subscription; + } + + /** + * Will mock the subscription canonicalizer with the provided subscription + * and the channel namer with the provided name. + * + * @param theSubscription + * @param theName + */ + private void mockSubscriptionCanonicalizerAndChannelNamer(Subscription theSubscription, String theName) { + Mockito.when(mySubscriptionCanonicalizer.canonicalize(Mockito.any(Subscription.class))) + .thenReturn(getCanonicalSubscriptionFromSubscription(theSubscription)); + Mockito.when(mySubscriptionDeliveryChannelNamer.nameFromSubscription(Mockito.any(CanonicalSubscription.class))) + .thenReturn(theName); + } + + /** + * Verifies an ActiveSubscription was registered, and passes it back + * for further verification. + * Also verifies that the interceptor was called. + */ + private ActiveSubscription verifySubscriptionIsRegistered() { + ArgumentCaptor subscriptionArgumentCaptor = ArgumentCaptor.forClass(ActiveSubscription.class); + Mockito.verify(mySubscriptionChannelRegistry) + .add(subscriptionArgumentCaptor.capture()); + Mockito.verify(myInterceptorBroadcaster) + .callHooks(Mockito.any(Pointcut.class), Mockito.any(HookParams.class)); + return subscriptionArgumentCaptor.getValue(); + } + + @Test + public void registerSubscriptionUnlessAlreadyRegistered_subscriptionWithRetry_createsAsExpected() { + // init + String channelName = "subscription-test"; + int retryCount = 2; + + Extension retryExtension = new Extension(); + retryExtension.setUrl(HapiExtensions.EX_RETRY_COUNT); + retryExtension.setValue(new IntegerType(retryCount)); + + Subscription subscription = createSubscription(retryExtension); + + // when + mockSubscriptionCanonicalizerAndChannelNamer(subscription, channelName); + + // test + boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); + + // verify + Assertions.assertTrue(registered); + ActiveSubscription activeSubscription = verifySubscriptionIsRegistered(); + Assertions.assertNotNull(activeSubscription.getRetryConfigurationParameters()); + Assertions.assertEquals(channelName, activeSubscription.getChannelName()); + Assertions.assertEquals(retryCount, activeSubscription.getRetryConfigurationParameters().getRetryCount()); + } + + @Test + public void registerSubscriptionUnlessAlreadyRegistered_subscriptionWithoutRetry_createsAsExpected() { + // init + String channelName = "subscription-test"; + + Subscription subscription = createSubscription(); + + // when + mockSubscriptionCanonicalizerAndChannelNamer(subscription, channelName); + + // test + boolean created = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); + + // verify + Assertions.assertTrue(created); + ActiveSubscription activeSubscription = verifySubscriptionIsRegistered(); + Assertions.assertNull(activeSubscription.getRetryConfigurationParameters()); + } + + @Test + public void registerSubscriptionUnlessAlreadyRegistered_subscriptionWithBadRetry_createsAsExpected() { + // init + String channelName = "subscription-test"; + int retryCount = -1; // invalid retry count -> no retries created + + Extension retryExtension = new Extension(); + retryExtension.setUrl(HapiExtensions.EX_RETRY_COUNT); + retryExtension.setValue(new IntegerType(retryCount)); + + Subscription subscription = createSubscription(retryExtension); + + // when + mockSubscriptionCanonicalizerAndChannelNamer(subscription, channelName); + + // test + boolean created = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); + + // verify + Assertions.assertTrue(created); + ActiveSubscription activeSubscription = verifySubscriptionIsRegistered(); + Assertions.assertNull(activeSubscription.getRetryConfigurationParameters()); + Assertions.assertEquals(channelName, activeSubscription.getChannelName()); + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java index 45ae69d9bfa..0e4f8e079ac 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java @@ -5,6 +5,11 @@ import ca.uhn.fhir.interceptor.executor.InterceptorService; import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig; import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl; +import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; +import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; +import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; +import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; +import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; @@ -67,6 +72,11 @@ public abstract class BaseSubscriptionTest { return new DaoConfig(); } + @Bean + public IChannelFactory channelFactory(IChannelNamer theNamer) { + return new LinkedBlockingChannelFactory(theNamer); + } + @Bean public SubscriptionChannelFactory mySubscriptionChannelFactory(IChannelNamer theChannelNamer) { return new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(theChannelNamer)); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/BaseSubscriptionRegistryTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/BaseSubscriptionRegistryTest.java index 9575746719f..281a48f1e24 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/BaseSubscriptionRegistryTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/BaseSubscriptionRegistryTest.java @@ -23,6 +23,20 @@ public abstract class BaseSubscriptionRegistryTest extends BaseSubscriptionDstu3 return subscription; } + protected org.hl7.fhir.r4.model.Subscription createSubscriptionR4() { + org.hl7.fhir.r4.model.Subscription subscription = new org.hl7.fhir.r4.model.Subscription(); + subscription.setId(SUBSCRIPTION_ID); + subscription.setCriteria(ORIG_CRITERIA); + subscription.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.ACTIVE); + org.hl7.fhir.r4.model.Subscription.SubscriptionChannelComponent channel + = new org.hl7.fhir.r4.model.Subscription.SubscriptionChannelComponent(); + channel.setType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.RESTHOOK); + channel.setPayload("application/json"); + channel.setEndpoint("http://unused.test.endpoint/"); + subscription.setChannel(channel); + return subscription; + } + protected void setChannel(Subscription theSubscription, Subscription.SubscriptionChannelType theResthook) { Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent(); channel.setType(theResthook); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistryTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistryTest.java index ecedb13f8f1..272165044a5 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistryTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistryTest.java @@ -4,9 +4,12 @@ import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; import org.hl7.fhir.dstu3.model.Subscription; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class SubscriptionRegistryTest extends BaseSubscriptionRegistryTest { + @Test public void updateSubscriptionReusesActiveSubscription() { Subscription subscription = createSubscription(); diff --git a/hapi-fhir-jpaserver-test-utilities/pom.xml b/hapi-fhir-jpaserver-test-utilities/pom.xml index c4cbddbdb44..c6d61a3a3e5 100644 --- a/hapi-fhir-jpaserver-test-utilities/pom.xml +++ b/hapi-fhir-jpaserver-test-utilities/pom.xml @@ -6,7 +6,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-jpaserver-uhnfhirtest/pom.xml b/hapi-fhir-jpaserver-uhnfhirtest/pom.xml index 6bbbf48b420..b7772c76fb8 100644 --- a/hapi-fhir-jpaserver-uhnfhirtest/pom.xml +++ b/hapi-fhir-jpaserver-uhnfhirtest/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-fhir - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../pom.xml diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/CommonConfig.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/CommonConfig.java index 210d0c2bf13..c352a6ac533 100644 --- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/CommonConfig.java +++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/CommonConfig.java @@ -2,7 +2,6 @@ package ca.uhn.fhirtest.config; import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.jpa.model.config.PartitionSettings; -import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig; import ca.uhn.fhir.jpa.subscription.channel.config.SubscriptionChannelConfig; import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig; import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig; diff --git a/hapi-fhir-server-mdm/pom.xml b/hapi-fhir-server-mdm/pom.xml index d26d558b9d0..756e3198731 100644 --- a/hapi-fhir-server-mdm/pom.xml +++ b/hapi-fhir-server-mdm/pom.xml @@ -7,7 +7,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-server-openapi/pom.xml b/hapi-fhir-server-openapi/pom.xml index 7d2e17e67ef..c14b81ea1a7 100644 --- a/hapi-fhir-server-openapi/pom.xml +++ b/hapi-fhir-server-openapi/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-server/pom.xml b/hapi-fhir-server/pom.xml index 4cffc75c07e..a56089dd871 100644 --- a/hapi-fhir-server/pom.xml +++ b/hapi-fhir-server/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-autoconfigure/pom.xml b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-autoconfigure/pom.xml index d375b423234..e37dc04b56d 100644 --- a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-autoconfigure/pom.xml +++ b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-autoconfigure/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/hapi-fhir-spring-boot-sample-client-apache/pom.xml b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/hapi-fhir-spring-boot-sample-client-apache/pom.xml index 3db2b6a61fd..967aeee0e87 100644 --- a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/hapi-fhir-spring-boot-sample-client-apache/pom.xml +++ b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/hapi-fhir-spring-boot-sample-client-apache/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-fhir-spring-boot-samples - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT hapi-fhir-spring-boot-sample-client-apache diff --git a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/hapi-fhir-spring-boot-sample-client-okhttp/pom.xml b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/hapi-fhir-spring-boot-sample-client-okhttp/pom.xml index 9ffdbc96316..f5017520290 100644 --- a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/hapi-fhir-spring-boot-sample-client-okhttp/pom.xml +++ b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/hapi-fhir-spring-boot-sample-client-okhttp/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-fhir-spring-boot-samples - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT hapi-fhir-spring-boot-sample-client-okhttp diff --git a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/hapi-fhir-spring-boot-sample-server-jersey/pom.xml b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/hapi-fhir-spring-boot-sample-server-jersey/pom.xml index 214dd281b28..a9f32ab08d1 100644 --- a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/hapi-fhir-spring-boot-sample-server-jersey/pom.xml +++ b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/hapi-fhir-spring-boot-sample-server-jersey/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-fhir-spring-boot-samples - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT hapi-fhir-spring-boot-sample-server-jersey diff --git a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/pom.xml b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/pom.xml index 106e4583d3b..ce76faec06d 100644 --- a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/pom.xml +++ b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-samples/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-fhir-spring-boot - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT hapi-fhir-spring-boot-samples diff --git a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-starter/pom.xml b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-starter/pom.xml index 5caa4392be2..3d1b93f4296 100644 --- a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-starter/pom.xml +++ b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-starter/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-spring-boot/pom.xml b/hapi-fhir-spring-boot/pom.xml index 1618267c957..8bd2eac8a15 100644 --- a/hapi-fhir-spring-boot/pom.xml +++ b/hapi-fhir-spring-boot/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-fhir - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../pom.xml diff --git a/hapi-fhir-sql-migrate/pom.xml b/hapi-fhir-sql-migrate/pom.xml index a932868f029..fa8f6f66817 100644 --- a/hapi-fhir-sql-migrate/pom.xml +++ b/hapi-fhir-sql-migrate/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-storage/pom.xml b/hapi-fhir-storage/pom.xml index dbaa1f37948..376493a9674 100644 --- a/hapi-fhir-storage/pom.xml +++ b/hapi-fhir-storage/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/api/BaseChannelSettings.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/api/BaseChannelSettings.java index 083576e457f..21878655b85 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/api/BaseChannelSettings.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/api/BaseChannelSettings.java @@ -20,9 +20,14 @@ package ca.uhn.fhir.jpa.subscription.channel.api; * #L% */ +import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration; + public abstract class BaseChannelSettings implements IChannelSettings { private boolean myQualifyChannelName = true; + private ChannelRetryConfiguration myRetryConfigurationParameters; + + /** * Default true. Used by IChannelNamer to decide how to qualify the channel name. */ @@ -37,4 +42,12 @@ public abstract class BaseChannelSettings implements IChannelSettings { public void setQualifyChannelName(boolean theQualifyChannelName) { myQualifyChannelName = theQualifyChannelName; } + + public void setRetryConfiguration(ChannelRetryConfiguration theParams) { + myRetryConfigurationParameters = theParams; + } + + public ChannelRetryConfiguration getRetryConfigurationParameters() { + return myRetryConfigurationParameters; + } } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannelFactory.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannelFactory.java index dbe3b50a29b..a589b83bf5e 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannelFactory.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannelFactory.java @@ -48,7 +48,7 @@ public class LinkedBlockingChannelFactory implements IChannelFactory { private static final Logger ourLog = LoggerFactory.getLogger(LinkedBlockingChannelFactory.class); private final IChannelNamer myChannelNamer; - private Map myChannels = Collections.synchronizedMap(new HashMap<>()); + private final Map myChannels = Collections.synchronizedMap(new HashMap<>()); public LinkedBlockingChannelFactory(IChannelNamer theChannelNamer) { myChannelNamer = theChannelNamer; @@ -69,7 +69,10 @@ public class LinkedBlockingChannelFactory implements IChannelFactory { return myChannelNamer; } - private LinkedBlockingChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers, IChannelSettings theChannelSettings) { + private LinkedBlockingChannel getOrCreateChannel(String theChannelName, + int theConcurrentConsumers, + IChannelSettings theChannelSettings) { + // TODO - does this need retry settings? final String channelName = myChannelNamer.getChannelName(theChannelName, theChannelSettings); return myChannels.computeIfAbsent(channelName, t -> { diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/BroadcastingSubscribableChannelWrapper.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/BroadcastingSubscribableChannelWrapper.java index 1ffeca5fb7b..c73386fdc2e 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/BroadcastingSubscribableChannelWrapper.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/BroadcastingSubscribableChannelWrapper.java @@ -67,7 +67,6 @@ public class BroadcastingSubscribableChannelWrapper extends AbstractSubscribable myWrappedChannel.addInterceptor(interceptor); } - @Override public String getName() { return myWrappedChannel.getName(); diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java index df4dad8c206..23a8bcb0e71 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java @@ -43,6 +43,7 @@ public class SubscriptionChannelFactory { public IChannelProducer newDeliverySendingChannel(String theChannelName, ChannelProducerSettings theChannelSettings) { ChannelProducerSettings config = newProducerConfigForDeliveryChannel(theChannelSettings); + config.setRetryConfiguration(theChannelSettings.getRetryConfigurationParameters()); return myChannelFactory.getOrCreateProducer(theChannelName, ResourceDeliveryJsonMessage.class, config); } @@ -66,17 +67,24 @@ public class SubscriptionChannelFactory { protected ChannelProducerSettings newProducerConfigForDeliveryChannel(ChannelProducerSettings theOptions) { ChannelProducerSettings config = new ChannelProducerSettings(); config.setConcurrentConsumers(getDeliveryChannelConcurrentConsumers()); + config.setRetryConfiguration(theOptions.getRetryConfigurationParameters()); return config; } protected ChannelConsumerSettings newConsumerConfigForDeliveryChannel(ChannelConsumerSettings theOptions) { ChannelConsumerSettings config = new ChannelConsumerSettings(); config.setConcurrentConsumers(getDeliveryChannelConcurrentConsumers()); + if (theOptions != null) { + config.setRetryConfiguration(theOptions.getRetryConfigurationParameters()); + } return config; } protected ChannelProducerSettings newProducerConfigForMatchingChannel(ChannelProducerSettings theOptions) { ChannelProducerSettings config = new ChannelProducerSettings(); + if (theOptions != null) { + config.setRetryConfiguration(theOptions.getRetryConfigurationParameters()); + } config.setConcurrentConsumers(getMatchingChannelConcurrentConsumers()); return config; } @@ -84,6 +92,9 @@ public class SubscriptionChannelFactory { protected ChannelConsumerSettings newConsumerConfigForMatchingChannel(ChannelConsumerSettings theOptions) { ChannelConsumerSettings config = new ChannelConsumerSettings(); config.setConcurrentConsumers(getMatchingChannelConcurrentConsumers()); + if (theOptions != null) { + config.setRetryConfiguration(theOptions.getRetryConfigurationParameters()); + } return config; } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionCanonicalizer.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionCanonicalizer.java index a81423f6fb6..c6e45cfad50 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionCanonicalizer.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionCanonicalizer.java @@ -174,7 +174,10 @@ public class SubscriptionCanonicalizer { .getChannel() .getExtension() .stream() - .collect(Collectors.groupingBy(t -> t.getUrl(), mapping(t -> t.getValueAsPrimitive().getValueAsString(), toList()))); + .collect(Collectors.groupingBy(t -> t.getUrl(), + mapping(t -> { + return t.getValueAsPrimitive().getValueAsString(); + }, toList()))); } case R5: { org.hl7.fhir.r5.model.Subscription subscription = (org.hl7.fhir.r5.model.Subscription) theSubscription; diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ChannelRetryConfiguration.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ChannelRetryConfiguration.java new file mode 100644 index 00000000000..7586a55e288 --- /dev/null +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/model/ChannelRetryConfiguration.java @@ -0,0 +1,16 @@ +package ca.uhn.fhir.jpa.subscription.model; + +public class ChannelRetryConfiguration { + /** + * Number of times to retry a failed message. + */ + private Integer myRetryCount; + + public void setRetryCount(int theRetryCount) { + myRetryCount = theRetryCount; + } + + public Integer getRetryCount() { + return myRetryCount; + } +} diff --git a/hapi-fhir-structures-dstu2.1/pom.xml b/hapi-fhir-structures-dstu2.1/pom.xml index 7ca4259d970..aa61e6feb06 100644 --- a/hapi-fhir-structures-dstu2.1/pom.xml +++ b/hapi-fhir-structures-dstu2.1/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-structures-dstu2/pom.xml b/hapi-fhir-structures-dstu2/pom.xml index 0853df44484..cbf1e586e60 100644 --- a/hapi-fhir-structures-dstu2/pom.xml +++ b/hapi-fhir-structures-dstu2/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-structures-dstu3/pom.xml b/hapi-fhir-structures-dstu3/pom.xml index a64bdfa034e..46c61f7e7b7 100644 --- a/hapi-fhir-structures-dstu3/pom.xml +++ b/hapi-fhir-structures-dstu3/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-structures-hl7org-dstu2/pom.xml b/hapi-fhir-structures-hl7org-dstu2/pom.xml index c9134d85e45..ffdb8a25e3b 100644 --- a/hapi-fhir-structures-hl7org-dstu2/pom.xml +++ b/hapi-fhir-structures-hl7org-dstu2/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-structures-r4/pom.xml b/hapi-fhir-structures-r4/pom.xml index 37ca311d2ad..8e3c377ce18 100644 --- a/hapi-fhir-structures-r4/pom.xml +++ b/hapi-fhir-structures-r4/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-structures-r5/pom.xml b/hapi-fhir-structures-r5/pom.xml index 2906af87357..bb629f82887 100644 --- a/hapi-fhir-structures-r5/pom.xml +++ b/hapi-fhir-structures-r5/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-test-utilities/pom.xml b/hapi-fhir-test-utilities/pom.xml index 0ee7da0ade3..ed58547d51c 100644 --- a/hapi-fhir-test-utilities/pom.xml +++ b/hapi-fhir-test-utilities/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-test-utilities/src/main/java/ca/uhn/fhir/test/utilities/server/HashMapResourceProviderExtension.java b/hapi-fhir-test-utilities/src/main/java/ca/uhn/fhir/test/utilities/server/HashMapResourceProviderExtension.java index d99dcb91f14..663c57c6223 100644 --- a/hapi-fhir-test-utilities/src/main/java/ca/uhn/fhir/test/utilities/server/HashMapResourceProviderExtension.java +++ b/hapi-fhir-test-utilities/src/main/java/ca/uhn/fhir/test/utilities/server/HashMapResourceProviderExtension.java @@ -59,13 +59,6 @@ public class HashMapResourceProviderExtension extends H myRestfulServerExtension.getRestfulServer().unregisterProvider(HashMapResourceProviderExtension.this); } - @Override - public synchronized MethodOutcome update(T theResource, String theConditional, RequestDetails theRequestDetails) { - T resourceClone = getFhirContext().newTerser().clone(theResource); - myUpdates.add(resourceClone); - return super.update(theResource, theConditional, theRequestDetails); - } - @Override public synchronized void clear() { super.clear(); @@ -83,12 +76,17 @@ public class HashMapResourceProviderExtension extends H myRestfulServerExtension.getRestfulServer().registerProvider(HashMapResourceProviderExtension.this); } + public synchronized MethodOutcome update(T theResource, String theConditional, RequestDetails theRequestDetails) { + T resourceClone = getFhirContext().newTerser().clone(theResource); + myUpdates.add(resourceClone); + return super.update(theResource, theConditional, theRequestDetails); + } + public HashMapResourceProviderExtension dontClearBetweenTests() { myClearBetweenTests = false; return this; } - public void waitForUpdateCount(long theCount) { assertThat(theCount, greaterThanOrEqualTo(getCountUpdate())); await().until(()->getCountUpdate(), equalTo(theCount)); diff --git a/hapi-fhir-testpage-overlay/pom.xml b/hapi-fhir-testpage-overlay/pom.xml index fd925796d97..d30e613223b 100644 --- a/hapi-fhir-testpage-overlay/pom.xml +++ b/hapi-fhir-testpage-overlay/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-fhir - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../pom.xml diff --git a/hapi-fhir-validation-resources-dstu2.1/pom.xml b/hapi-fhir-validation-resources-dstu2.1/pom.xml index f3e80c59752..a086cd9678a 100644 --- a/hapi-fhir-validation-resources-dstu2.1/pom.xml +++ b/hapi-fhir-validation-resources-dstu2.1/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-validation-resources-dstu2/pom.xml b/hapi-fhir-validation-resources-dstu2/pom.xml index 0b31d9fa7cc..81bef49e782 100644 --- a/hapi-fhir-validation-resources-dstu2/pom.xml +++ b/hapi-fhir-validation-resources-dstu2/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-validation-resources-dstu3/pom.xml b/hapi-fhir-validation-resources-dstu3/pom.xml index 29bdf353708..a5e55f6f015 100644 --- a/hapi-fhir-validation-resources-dstu3/pom.xml +++ b/hapi-fhir-validation-resources-dstu3/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-validation-resources-r4/pom.xml b/hapi-fhir-validation-resources-r4/pom.xml index 14e00ce7526..7cc5fe4e292 100644 --- a/hapi-fhir-validation-resources-r4/pom.xml +++ b/hapi-fhir-validation-resources-r4/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-validation-resources-r5/pom.xml b/hapi-fhir-validation-resources-r5/pom.xml index 1c7c3916c0d..21a2db0b954 100644 --- a/hapi-fhir-validation-resources-r5/pom.xml +++ b/hapi-fhir-validation-resources-r5/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-fhir-validation/pom.xml b/hapi-fhir-validation/pom.xml index 40ceca3b336..dfd33d8cc8b 100644 --- a/hapi-fhir-validation/pom.xml +++ b/hapi-fhir-validation/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-deployable-pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../hapi-deployable-pom/pom.xml diff --git a/hapi-tinder-plugin/pom.xml b/hapi-tinder-plugin/pom.xml index a323960c7c1..fbaa166775c 100644 --- a/hapi-tinder-plugin/pom.xml +++ b/hapi-tinder-plugin/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-fhir - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../pom.xml @@ -58,37 +58,37 @@ ca.uhn.hapi.fhir hapi-fhir-structures-dstu3 - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ca.uhn.hapi.fhir hapi-fhir-structures-hl7org-dstu2 - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ca.uhn.hapi.fhir hapi-fhir-structures-r4 - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ca.uhn.hapi.fhir hapi-fhir-structures-r5 - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ca.uhn.hapi.fhir hapi-fhir-validation-resources-dstu2 - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ca.uhn.hapi.fhir hapi-fhir-validation-resources-dstu3 - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ca.uhn.hapi.fhir hapi-fhir-validation-resources-r4 - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT org.apache.velocity diff --git a/hapi-tinder-test/pom.xml b/hapi-tinder-test/pom.xml index aa149ab2270..583d6331853 100644 --- a/hapi-tinder-test/pom.xml +++ b/hapi-tinder-test/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-fhir - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index 4532bd38464..17fb0846fef 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ ca.uhn.hapi.fhir hapi-fhir pom - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT HAPI-FHIR An open-source implementation of the FHIR specification in Java. https://hapifhir.io diff --git a/tests/hapi-fhir-base-test-jaxrsserver-kotlin/pom.xml b/tests/hapi-fhir-base-test-jaxrsserver-kotlin/pom.xml index 1b1ddcaf0e2..1855b1ef816 100644 --- a/tests/hapi-fhir-base-test-jaxrsserver-kotlin/pom.xml +++ b/tests/hapi-fhir-base-test-jaxrsserver-kotlin/pom.xml @@ -6,7 +6,7 @@ ca.uhn.hapi.fhir hapi-fhir - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../../pom.xml diff --git a/tests/hapi-fhir-base-test-mindeps-client/pom.xml b/tests/hapi-fhir-base-test-mindeps-client/pom.xml index 2da1c0bab39..4193c4cb7c9 100644 --- a/tests/hapi-fhir-base-test-mindeps-client/pom.xml +++ b/tests/hapi-fhir-base-test-mindeps-client/pom.xml @@ -4,7 +4,7 @@ ca.uhn.hapi.fhir hapi-fhir - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../../pom.xml diff --git a/tests/hapi-fhir-base-test-mindeps-server/pom.xml b/tests/hapi-fhir-base-test-mindeps-server/pom.xml index d81ed3c9a93..ba2830c5882 100644 --- a/tests/hapi-fhir-base-test-mindeps-server/pom.xml +++ b/tests/hapi-fhir-base-test-mindeps-server/pom.xml @@ -5,7 +5,7 @@ ca.uhn.hapi.fhir hapi-fhir - 5.7.0-PRE1-SNAPSHOT + 5.7.0-PRE2-SNAPSHOT ../../pom.xml