Issue 1134 retry mechanic on subscriptions (#3121)

* issue-1134 initial work on adding retry handler

* issue-1134 subdscription creation fixes

* issue-1134 update the test

* issue-1134 retry policy off canonicalsubscription

* issue 1134 base subscription changes

* add failing test

* issue 1134 retry configs on base parameters

* issue 1134 passing forward params

* issue-1134 added more tests for making sure subscriptions create their channels correctly

* issue-1134 updates to get the retry to channels

* issue-1134 updating channel factory

* issue-1134 remove the dlq since it's not defineable

* issue-3120 changelog added

* issue-3120 cleaning up

* issue-3120 test fix

* issue-3120 review fixes

* issue-3120 fixed bad master merge

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-MacBook-Pro.local>
Co-authored-by: Ken Stevens <khstevens@gmail.com>
This commit is contained in:
TipzCM 2021-10-29 11:21:43 -04:00 committed by GitHub
parent 8a7b5c1b2f
commit 9b23e9e825
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
80 changed files with 614 additions and 103 deletions

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -122,6 +122,14 @@ public class HapiExtensions {
*/
public static final String EXT_OP_PARAMETER_EXAMPLE_VALUE = "http://hapifhir.io/fhir/StructureDefinition/op-parameter-example-value";
/**
* This extension provides a way for subscribers to provide
* a "retry-count".
* If provided, subscriptions will be retried this many times
* (to a total of retry-count + 1 (for original attempt)
*/
public static final String EX_RETRY_COUNT = "http://hapifhir.io/fhir/StructureDefinition/subscription-delivery-retry-count";
/**
* Non instantiable
*/

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -3,14 +3,14 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-bom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<packaging>pom</packaging>
<name>HAPI FHIR BOM</name>
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-cli</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -0,0 +1,5 @@
---
type: add
issue: 3120
title: "Added http://hapifhir.io/fhir/StructureDefinition/subscription-delivery-retry-count extension that can be provided
to a subscription to define a specific retry strategy (retry retry-count number of times before giving up)."

View File

@ -11,7 +11,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -0,0 +1,29 @@
package ca.uhn.fhir.jpa.subscription.channel.models;
import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration;
public class BaseChannelParameters {
private final String myChannelName;
private ChannelRetryConfiguration myRetryConfiguration;
/**
* Constructor
*/
public BaseChannelParameters(String theChannelName) {
myChannelName = theChannelName;
}
public String getChannelName() {
return myChannelName;
}
public void setRetryConfiguration(ChannelRetryConfiguration theConfiguration) {
myRetryConfiguration = theConfiguration;
}
public ChannelRetryConfiguration getRetryConfiguration() {
return myRetryConfiguration;
}
}

View File

@ -0,0 +1,16 @@
package ca.uhn.fhir.jpa.subscription.channel.models;
public class ProducingChannelParameters extends BaseChannelParameters {
/**
* Constructor
*
* Producing channels are sending channels. They send data to topics/queues.
*
* @param theChannelName
*/
public ProducingChannelParameters(String theChannelName) {
super(theChannelName);
}
}

View File

@ -0,0 +1,15 @@
package ca.uhn.fhir.jpa.subscription.channel.models;
public class ReceivingChannelParameters extends BaseChannelParameters {
/**
* Constructor
*
* Receiving channels are channels that receive data from topics/queues
*
* @param theChannelName
*/
public ReceivingChannelParameters(String theChannelName) {
super(theChannelName);
}
}

View File

@ -20,10 +20,15 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.models.ProducingChannelParameters;
import ca.uhn.fhir.jpa.subscription.channel.models.ReceivingChannelParameters;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import org.slf4j.Logger;
@ -60,29 +65,63 @@ public class SubscriptionChannelRegistry {
return;
}
IChannelReceiver channelReceiver = newReceivingChannel(channelName);
// we get the retry configurations from the cannonicalized subscriber
// these will be provided to both the producer and receiver channel
ChannelRetryConfiguration retryConfigParameters = theActiveSubscription.getRetryConfigurationParameters();
/*
* When we create a subscription, we create both
* a producing/sending channel and
* a receiving channel.
*
* Matched subscriptions are sent to the Sending channel
* and the sending channel sends to subscription matching service.
*
* Receiving channel will send it out to
* the subscriber hook (REST, email, etc).
*/
// the receiving channel
// this sends to the hook (resthook/message/email/whatever)
ReceivingChannelParameters receivingParameters = new ReceivingChannelParameters(channelName);
receivingParameters.setRetryConfiguration(retryConfigParameters);
IChannelReceiver channelReceiver = newReceivingChannel(receivingParameters);
Optional<MessageHandler> deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType());
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, channelReceiver);
deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler);
myDeliveryReceiverChannels.put(channelName, subscriptionChannelWithHandlers);
IChannelProducer sendingChannel = newSendingChannel(channelName);
// create the producing channel.
// channel used for sending to subscription matcher
ProducingChannelParameters producingChannelParameters = new ProducingChannelParameters(channelName);
producingChannelParameters.setRetryConfiguration(retryConfigParameters);
IChannelProducer sendingChannel = newSendingChannel(producingChannelParameters);
myChannelNameToSender.put(channelName, sendingChannel);
}
protected IChannelReceiver newReceivingChannel(String theChannelName) {
return mySubscriptionDeliveryChannelFactory.newDeliveryReceivingChannel(theChannelName, null);
protected IChannelReceiver newReceivingChannel(ReceivingChannelParameters theParameters) {
ChannelConsumerSettings settings = new ChannelConsumerSettings();
settings.setRetryConfiguration(theParameters.getRetryConfiguration());
return mySubscriptionDeliveryChannelFactory.newDeliveryReceivingChannel(theParameters.getChannelName(),
settings);
}
protected IChannelProducer newSendingChannel(String theChannelName) {
return mySubscriptionDeliveryChannelFactory.newDeliverySendingChannel(theChannelName, null);
protected IChannelProducer newSendingChannel(ProducingChannelParameters theParameters) {
ChannelProducerSettings settings = new ChannelProducerSettings();
settings.setRetryConfiguration(theParameters.getRetryConfiguration());
return mySubscriptionDeliveryChannelFactory.newDeliverySendingChannel(theParameters.getChannelName(),
settings);
}
public synchronized void remove(ActiveSubscription theActiveSubscription) {
String channelName = theActiveSubscription.getChannelName();
ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId(), channelName);
boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
ChannelRetryConfiguration retryConfig = theActiveSubscription.getRetryConfigurationParameters();
if (!removed) {
ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId(), channelName);
}

View File

@ -24,10 +24,10 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -73,7 +73,6 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
super();
}
@Override
public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
ourLog.trace("Handling resource modified message: {}", theMessage);

View File

@ -23,15 +23,19 @@ package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration;
public class ActiveSubscription {
private SubscriptionCriteriaParser.SubscriptionCriteria myCriteria;
private final String myChannelName;
private final String myId;
private CanonicalSubscription mySubscription;
private boolean flagForDeletion;
private ChannelRetryConfiguration myRetryConfigurationParameters;
public ActiveSubscription(CanonicalSubscription theSubscription, String theChannelName) {
myChannelName = theChannelName;
myId = theSubscription.getIdPart();
@ -70,4 +74,12 @@ public class ActiveSubscription {
public CanonicalSubscriptionChannelType getChannelType() {
return mySubscription.getChannelType();
}
public void setRetryConfiguration(ChannelRetryConfiguration theParams) {
myRetryConfigurationParameters = theParams;
}
public ChannelRetryConfiguration getRetryConfigurationParameters() {
return myRetryConfigurationParameters;
}
}

View File

@ -26,6 +26,8 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration;
import ca.uhn.fhir.util.HapiExtensions;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
@ -80,17 +82,48 @@ public class SubscriptionRegistry {
return activeSubscription.map(ActiveSubscription::getSubscription);
}
private void registerSubscription(IIdType theId, IBaseResource theSubscription) {
/**
* Extracts the retry configuration settings from the CanonicalSubscription object.
*
* Returns the configuration, or null, if no retry (or a bad retry value)
* is specified.
*
* @param theSubscription
* @return
*/
private ChannelRetryConfiguration getRetryConfigurationFromSubscriptionExtensions(CanonicalSubscription theSubscription) {
ChannelRetryConfiguration configuration = new ChannelRetryConfiguration();
List<String> retryCount = theSubscription.getChannelExtensions(HapiExtensions.EX_RETRY_COUNT);
if (retryCount.size() == 1) {
String val = retryCount.get(0);
configuration.setRetryCount(Integer.parseInt(val));
}
// else - 0 or more than 1 means no retry policy at all
// retry count is required for any retry policy
if (configuration.getRetryCount() == null || configuration.getRetryCount() < 0) {
configuration = null;
}
return configuration;
}
private void registerSubscription(IIdType theId, CanonicalSubscription theCanonicalSubscription) {
Validate.notNull(theId);
String subscriptionId = theId.getIdPart();
Validate.notBlank(subscriptionId);
Validate.notNull(theSubscription);
Validate.notNull(theCanonicalSubscription);
CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(theCanonicalSubscription);
String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(canonicalized);
// get the actual retry configuration
ChannelRetryConfiguration configuration = getRetryConfigurationFromSubscriptionExtensions(theCanonicalSubscription);
ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, channelName);
ActiveSubscription activeSubscription = new ActiveSubscription(theCanonicalSubscription, channelName);
activeSubscription.setRetryConfiguration(configuration);
// add to our registries
mySubscriptionChannelRegistry.add(activeSubscription);
myActiveSubscriptionCache.put(subscriptionId, activeSubscription);
@ -98,9 +131,8 @@ public class SubscriptionRegistry {
// Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
HookParams params = new HookParams()
.add(CanonicalSubscription.class, canonicalized);
.add(CanonicalSubscription.class, theCanonicalSubscription);
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params);
}
public void unregisterSubscriptionIfRegistered(String theSubscriptionId) {
@ -114,7 +146,6 @@ public class SubscriptionRegistry {
// Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED
HookParams params = new HookParams();
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED, params);
}
}
@ -135,6 +166,7 @@ public class SubscriptionRegistry {
}
public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
Validate.notNull(theSubscription);
Optional<CanonicalSubscription> existingSubscription = hasSubscription(theSubscription.getIdElement());
CanonicalSubscription newSubscription = mySubscriptionCanonicalizer.canonicalize(theSubscription);
@ -152,7 +184,7 @@ public class SubscriptionRegistry {
unregisterSubscriptionIfRegistered(theSubscription.getIdElement().getIdPart());
}
if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) {
registerSubscription(theSubscription.getIdElement(), theSubscription);
registerSubscription(theSubscription.getIdElement(), newSubscription);
return true;
} else {
return false;

View File

@ -0,0 +1,107 @@
package ca.uhn.fhir.jpa.subscription.channel.subscription;
import ca.uhn.fhir.jpa.subscription.channel.api.BaseChannelSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.util.Optional;
@ExtendWith(MockitoExtension.class)
public class SubscriptionChannelRegistryTest {
@Mock
private SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
@Mock
private SubscriptionChannelFactory mySubscriptionChannelFactory;
@InjectMocks
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
private ActiveSubscription createActiveSubscription(String theChannelName, int theRetryCount) {
CanonicalSubscription subscription = new CanonicalSubscription();
subscription.setChannelType(CanonicalSubscriptionChannelType.RESTHOOK);
ChannelRetryConfiguration configuration = new ChannelRetryConfiguration();
configuration.setRetryCount(theRetryCount);
ActiveSubscription activeSubscription = new ActiveSubscription(subscription, theChannelName);
activeSubscription.setRetryConfiguration(configuration);
return activeSubscription;
}
@Test
public void add_subscriptionWithRetryConfigs_createsSendingAndReceivingChannelsWithRetryConfigs() {
int retryCount = 5;
String channelName = "test";
ActiveSubscription activeSubscription = createActiveSubscription(channelName, retryCount);
// mocks
MessageHandler messageHandler = Mockito.mock(MessageHandler.class);
IChannelReceiver receiver = Mockito.mock(IChannelReceiver.class);
IChannelProducer producer = Mockito.mock(IChannelProducer.class);
// when
Mockito.when(mySubscriptionChannelFactory.newDeliveryReceivingChannel(
Mockito.anyString(),
Mockito.any(ChannelConsumerSettings.class)
)).thenReturn(receiver);
Mockito.when(mySubscriptionChannelFactory.newDeliverySendingChannel(
Mockito.anyString(),
Mockito.any(ChannelProducerSettings.class)
)).thenReturn(producer);
Mockito.when(mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(Mockito.any(CanonicalSubscriptionChannelType.class)))
.thenReturn(Optional.of(messageHandler));
// test
mySubscriptionChannelRegistry.add(activeSubscription);
// verify
// the receiver and sender should've been added to the maps
SubscriptionChannelWithHandlers receiverChannel = mySubscriptionChannelRegistry.getDeliveryReceiverChannel(channelName);
MessageChannel senderChannel = mySubscriptionChannelRegistry.getDeliverySenderChannel(channelName);
Assertions.assertEquals(producer, senderChannel);
Assertions.assertEquals(receiver, receiverChannel.getChannel());
// verify the creation of the sender/receiver
// both have retry values provided
ArgumentCaptor<ChannelConsumerSettings> consumerCaptor = ArgumentCaptor.forClass(ChannelConsumerSettings.class);
Mockito.verify(mySubscriptionChannelFactory)
.newDeliveryReceivingChannel(Mockito.anyString(),
consumerCaptor.capture());
ChannelConsumerSettings consumerSettings = consumerCaptor.getValue();
verifySettingsHaveRetryConfig(consumerSettings, retryCount);
ArgumentCaptor<ChannelProducerSettings> producerCaptor = ArgumentCaptor.forClass(ChannelProducerSettings.class);
Mockito.verify(mySubscriptionChannelFactory)
.newDeliverySendingChannel(Mockito.anyString(),
producerCaptor.capture());
verifySettingsHaveRetryConfig(producerCaptor.getValue(), retryCount);
}
/**
* Verifies the retry configs for the channel
* @param theSettings
* @param theRetryCount
*/
private void verifySettingsHaveRetryConfig(BaseChannelSettings theSettings, int theRetryCount) {
Assertions.assertNotNull(theSettings);
Assertions.assertNotNull(theSettings.getRetryConfigurationParameters());
Assertions.assertEquals(theRetryCount, theSettings.getRetryConfigurationParameters().getRetryCount());
}
}

View File

@ -0,0 +1,180 @@
package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.util.HapiExtensions;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.IntegerType;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ExtendWith(MockitoExtension.class)
public class SubscriptionRegistryTest {
@Mock
private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
@Mock
private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
@Mock
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
@Mock
private IInterceptorBroadcaster myInterceptorBroadcaster;
@InjectMocks
private SubscriptionRegistry mySubscriptionRegistry;
private Subscription createSubscription(Extension... theExtensions) {
Subscription subscription = new Subscription();
subscription.setId("123");
subscription.setCriteria("Patient");
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
Subscription.SubscriptionChannelComponent channel
= new Subscription.SubscriptionChannelComponent();
channel.setType(Subscription.SubscriptionChannelType.RESTHOOK);
channel.setPayload("application/json");
channel.setEndpoint("http://unused.test.endpoint/");
subscription.setChannel(channel);
if (theExtensions != null) {
for (Extension ex : theExtensions) {
channel.addExtension(ex);
}
}
return subscription;
}
private CanonicalSubscription getCanonicalSubscriptionFromSubscription(Subscription theSubscription) {
CanonicalSubscription subscription = new CanonicalSubscription();
subscription.setStatus(theSubscription.getStatus());
subscription.setCriteriaString(theSubscription.getCriteria());
Subscription.SubscriptionChannelComponent channel = theSubscription.getChannel();
HashMap<String, List<String>> extensions = new HashMap<String, List<String>>();
for (Extension ex : channel.getExtension()) {
if (!extensions.containsKey(ex.getUrl())) {
extensions.put(ex.getUrl(), new ArrayList<>());
}
extensions.get(ex.getUrl()).add(ex.getValueAsPrimitive().getValueAsString());
}
subscription.setChannelExtensions(extensions);
return subscription;
}
/**
* Will mock the subscription canonicalizer with the provided subscription
* and the channel namer with the provided name.
*
* @param theSubscription
* @param theName
*/
private void mockSubscriptionCanonicalizerAndChannelNamer(Subscription theSubscription, String theName) {
Mockito.when(mySubscriptionCanonicalizer.canonicalize(Mockito.any(Subscription.class)))
.thenReturn(getCanonicalSubscriptionFromSubscription(theSubscription));
Mockito.when(mySubscriptionDeliveryChannelNamer.nameFromSubscription(Mockito.any(CanonicalSubscription.class)))
.thenReturn(theName);
}
/**
* Verifies an ActiveSubscription was registered, and passes it back
* for further verification.
* Also verifies that the interceptor was called.
*/
private ActiveSubscription verifySubscriptionIsRegistered() {
ArgumentCaptor<ActiveSubscription> subscriptionArgumentCaptor = ArgumentCaptor.forClass(ActiveSubscription.class);
Mockito.verify(mySubscriptionChannelRegistry)
.add(subscriptionArgumentCaptor.capture());
Mockito.verify(myInterceptorBroadcaster)
.callHooks(Mockito.any(Pointcut.class), Mockito.any(HookParams.class));
return subscriptionArgumentCaptor.getValue();
}
@Test
public void registerSubscriptionUnlessAlreadyRegistered_subscriptionWithRetry_createsAsExpected() {
// init
String channelName = "subscription-test";
int retryCount = 2;
Extension retryExtension = new Extension();
retryExtension.setUrl(HapiExtensions.EX_RETRY_COUNT);
retryExtension.setValue(new IntegerType(retryCount));
Subscription subscription = createSubscription(retryExtension);
// when
mockSubscriptionCanonicalizerAndChannelNamer(subscription, channelName);
// test
boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
// verify
Assertions.assertTrue(registered);
ActiveSubscription activeSubscription = verifySubscriptionIsRegistered();
Assertions.assertNotNull(activeSubscription.getRetryConfigurationParameters());
Assertions.assertEquals(channelName, activeSubscription.getChannelName());
Assertions.assertEquals(retryCount, activeSubscription.getRetryConfigurationParameters().getRetryCount());
}
@Test
public void registerSubscriptionUnlessAlreadyRegistered_subscriptionWithoutRetry_createsAsExpected() {
// init
String channelName = "subscription-test";
Subscription subscription = createSubscription();
// when
mockSubscriptionCanonicalizerAndChannelNamer(subscription, channelName);
// test
boolean created = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
// verify
Assertions.assertTrue(created);
ActiveSubscription activeSubscription = verifySubscriptionIsRegistered();
Assertions.assertNull(activeSubscription.getRetryConfigurationParameters());
}
@Test
public void registerSubscriptionUnlessAlreadyRegistered_subscriptionWithBadRetry_createsAsExpected() {
// init
String channelName = "subscription-test";
int retryCount = -1; // invalid retry count -> no retries created
Extension retryExtension = new Extension();
retryExtension.setUrl(HapiExtensions.EX_RETRY_COUNT);
retryExtension.setValue(new IntegerType(retryCount));
Subscription subscription = createSubscription(retryExtension);
// when
mockSubscriptionCanonicalizerAndChannelNamer(subscription, channelName);
// test
boolean created = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
// verify
Assertions.assertTrue(created);
ActiveSubscription activeSubscription = verifySubscriptionIsRegistered();
Assertions.assertNull(activeSubscription.getRetryConfigurationParameters());
Assertions.assertEquals(channelName, activeSubscription.getChannelName());
}
}

View File

@ -5,6 +5,11 @@ import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig;
import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
@ -67,6 +72,11 @@ public abstract class BaseSubscriptionTest {
return new DaoConfig();
}
@Bean
public IChannelFactory channelFactory(IChannelNamer theNamer) {
return new LinkedBlockingChannelFactory(theNamer);
}
@Bean
public SubscriptionChannelFactory mySubscriptionChannelFactory(IChannelNamer theChannelNamer) {
return new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(theChannelNamer));

View File

@ -23,6 +23,20 @@ public abstract class BaseSubscriptionRegistryTest extends BaseSubscriptionDstu3
return subscription;
}
protected org.hl7.fhir.r4.model.Subscription createSubscriptionR4() {
org.hl7.fhir.r4.model.Subscription subscription = new org.hl7.fhir.r4.model.Subscription();
subscription.setId(SUBSCRIPTION_ID);
subscription.setCriteria(ORIG_CRITERIA);
subscription.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.ACTIVE);
org.hl7.fhir.r4.model.Subscription.SubscriptionChannelComponent channel
= new org.hl7.fhir.r4.model.Subscription.SubscriptionChannelComponent();
channel.setType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.RESTHOOK);
channel.setPayload("application/json");
channel.setEndpoint("http://unused.test.endpoint/");
subscription.setChannel(channel);
return subscription;
}
protected void setChannel(Subscription theSubscription, Subscription.SubscriptionChannelType theResthook) {
Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent();
channel.setType(theResthook);

View File

@ -4,9 +4,12 @@ import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SubscriptionRegistryTest extends BaseSubscriptionRegistryTest {
@Test
public void updateSubscriptionReusesActiveSubscription() {
Subscription subscription = createSubscription();

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -2,7 +2,6 @@ package ca.uhn.fhirtest.config;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.channel.config.SubscriptionChannelConfig;
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig;

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-apache</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-okhttp</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-server-jersey</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -20,9 +20,14 @@ package ca.uhn.fhir.jpa.subscription.channel.api;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration;
public abstract class BaseChannelSettings implements IChannelSettings {
private boolean myQualifyChannelName = true;
private ChannelRetryConfiguration myRetryConfigurationParameters;
/**
* Default true. Used by IChannelNamer to decide how to qualify the channel name.
*/
@ -37,4 +42,12 @@ public abstract class BaseChannelSettings implements IChannelSettings {
public void setQualifyChannelName(boolean theQualifyChannelName) {
myQualifyChannelName = theQualifyChannelName;
}
public void setRetryConfiguration(ChannelRetryConfiguration theParams) {
myRetryConfigurationParameters = theParams;
}
public ChannelRetryConfiguration getRetryConfigurationParameters() {
return myRetryConfigurationParameters;
}
}

View File

@ -48,7 +48,7 @@ public class LinkedBlockingChannelFactory implements IChannelFactory {
private static final Logger ourLog = LoggerFactory.getLogger(LinkedBlockingChannelFactory.class);
private final IChannelNamer myChannelNamer;
private Map<String, LinkedBlockingChannel> myChannels = Collections.synchronizedMap(new HashMap<>());
private final Map<String, LinkedBlockingChannel> myChannels = Collections.synchronizedMap(new HashMap<>());
public LinkedBlockingChannelFactory(IChannelNamer theChannelNamer) {
myChannelNamer = theChannelNamer;
@ -69,7 +69,10 @@ public class LinkedBlockingChannelFactory implements IChannelFactory {
return myChannelNamer;
}
private LinkedBlockingChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers, IChannelSettings theChannelSettings) {
private LinkedBlockingChannel getOrCreateChannel(String theChannelName,
int theConcurrentConsumers,
IChannelSettings theChannelSettings) {
// TODO - does this need retry settings?
final String channelName = myChannelNamer.getChannelName(theChannelName, theChannelSettings);
return myChannels.computeIfAbsent(channelName, t -> {

View File

@ -67,7 +67,6 @@ public class BroadcastingSubscribableChannelWrapper extends AbstractSubscribable
myWrappedChannel.addInterceptor(interceptor);
}
@Override
public String getName() {
return myWrappedChannel.getName();

View File

@ -43,6 +43,7 @@ public class SubscriptionChannelFactory {
public IChannelProducer newDeliverySendingChannel(String theChannelName, ChannelProducerSettings theChannelSettings) {
ChannelProducerSettings config = newProducerConfigForDeliveryChannel(theChannelSettings);
config.setRetryConfiguration(theChannelSettings.getRetryConfigurationParameters());
return myChannelFactory.getOrCreateProducer(theChannelName, ResourceDeliveryJsonMessage.class, config);
}
@ -66,17 +67,24 @@ public class SubscriptionChannelFactory {
protected ChannelProducerSettings newProducerConfigForDeliveryChannel(ChannelProducerSettings theOptions) {
ChannelProducerSettings config = new ChannelProducerSettings();
config.setConcurrentConsumers(getDeliveryChannelConcurrentConsumers());
config.setRetryConfiguration(theOptions.getRetryConfigurationParameters());
return config;
}
protected ChannelConsumerSettings newConsumerConfigForDeliveryChannel(ChannelConsumerSettings theOptions) {
ChannelConsumerSettings config = new ChannelConsumerSettings();
config.setConcurrentConsumers(getDeliveryChannelConcurrentConsumers());
if (theOptions != null) {
config.setRetryConfiguration(theOptions.getRetryConfigurationParameters());
}
return config;
}
protected ChannelProducerSettings newProducerConfigForMatchingChannel(ChannelProducerSettings theOptions) {
ChannelProducerSettings config = new ChannelProducerSettings();
if (theOptions != null) {
config.setRetryConfiguration(theOptions.getRetryConfigurationParameters());
}
config.setConcurrentConsumers(getMatchingChannelConcurrentConsumers());
return config;
}
@ -84,6 +92,9 @@ public class SubscriptionChannelFactory {
protected ChannelConsumerSettings newConsumerConfigForMatchingChannel(ChannelConsumerSettings theOptions) {
ChannelConsumerSettings config = new ChannelConsumerSettings();
config.setConcurrentConsumers(getMatchingChannelConcurrentConsumers());
if (theOptions != null) {
config.setRetryConfiguration(theOptions.getRetryConfigurationParameters());
}
return config;
}

View File

@ -174,7 +174,10 @@ public class SubscriptionCanonicalizer {
.getChannel()
.getExtension()
.stream()
.collect(Collectors.groupingBy(t -> t.getUrl(), mapping(t -> t.getValueAsPrimitive().getValueAsString(), toList())));
.collect(Collectors.groupingBy(t -> t.getUrl(),
mapping(t -> {
return t.getValueAsPrimitive().getValueAsString();
}, toList())));
}
case R5: {
org.hl7.fhir.r5.model.Subscription subscription = (org.hl7.fhir.r5.model.Subscription) theSubscription;

View File

@ -0,0 +1,16 @@
package ca.uhn.fhir.jpa.subscription.model;
public class ChannelRetryConfiguration {
/**
* Number of times to retry a failed message.
*/
private Integer myRetryCount;
public void setRetryCount(int theRetryCount) {
myRetryCount = theRetryCount;
}
public Integer getRetryCount() {
return myRetryCount;
}
}

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -59,13 +59,6 @@ public class HashMapResourceProviderExtension<T extends IBaseResource> extends H
myRestfulServerExtension.getRestfulServer().unregisterProvider(HashMapResourceProviderExtension.this);
}
@Override
public synchronized MethodOutcome update(T theResource, String theConditional, RequestDetails theRequestDetails) {
T resourceClone = getFhirContext().newTerser().clone(theResource);
myUpdates.add(resourceClone);
return super.update(theResource, theConditional, theRequestDetails);
}
@Override
public synchronized void clear() {
super.clear();
@ -83,12 +76,17 @@ public class HashMapResourceProviderExtension<T extends IBaseResource> extends H
myRestfulServerExtension.getRestfulServer().registerProvider(HashMapResourceProviderExtension.this);
}
public synchronized MethodOutcome update(T theResource, String theConditional, RequestDetails theRequestDetails) {
T resourceClone = getFhirContext().newTerser().clone(theResource);
myUpdates.add(resourceClone);
return super.update(theResource, theConditional, theRequestDetails);
}
public HashMapResourceProviderExtension<T> dontClearBetweenTests() {
myClearBetweenTests = false;
return this;
}
public void waitForUpdateCount(long theCount) {
assertThat(theCount, greaterThanOrEqualTo(getCountUpdate()));
await().until(()->getCountUpdate(), equalTo(theCount));

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
@ -310,12 +310,12 @@
<artifactId>jena-arq</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-jpaserver-model</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-jpaserver-model</artifactId>
<version>5.4.0-PRE8-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@ -58,37 +58,37 @@
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-structures-dstu3</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-structures-hl7org-dstu2</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-structures-r4</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-structures-r5</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-validation-resources-dstu2</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-validation-resources-dstu3</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-validation-resources-r4</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<packaging>pom</packaging>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<name>HAPI-FHIR</name>
<description>An open-source implementation of the FHIR specification in Java.</description>
<url>https://hapifhir.io</url>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>5.6.0-PRE9-SNAPSHOT</version>
<version>5.6.0-PRE10-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>