Merge pull request #1513 from jamesagnew/ks-subscription-delivery-queue-configurable-name

subscription delivery queue configurable name
This commit is contained in:
Ken Stevens 2019-10-03 15:19:12 -04:00 committed by GitHub
commit 38ad11be64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 614 additions and 340 deletions

View File

@ -6,8 +6,8 @@ import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.interceptor.executor.InterceptorService; import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider; import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider;
import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor; import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor;
import ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl;
import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider; import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider;
import ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.dao.DaoRegistry; import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.graphql.JpaStorageServices; import ca.uhn.fhir.jpa.graphql.JpaStorageServices;
@ -28,8 +28,8 @@ import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl; import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl;
import ca.uhn.fhir.jpa.subscription.dbmatcher.CompositeInMemoryDaoSubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.dbmatcher.CompositeInMemoryDaoSubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.dbmatcher.DaoSubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.dbmatcher.DaoSubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.module.matcher.InMemorySubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.module.matcher.InMemorySubscriptionMatcher;
import ca.uhn.fhir.rest.server.interceptor.consent.IConsentContextServices; import ca.uhn.fhir.rest.server.interceptor.consent.IConsentContextServices;

View File

@ -22,13 +22,13 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.config.BaseConfig; import ca.uhn.fhir.jpa.config.BaseConfig;
import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.DaoRegistry; import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionCanonicalizer; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionConstants; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionConstants;
@ -233,7 +233,7 @@ public class SubscriptionActivatingInterceptor {
private void submitResourceModified(final ResourceModifiedMessage theMsg) { private void submitResourceModified(final ResourceModifiedMessage theMsg) {
switch (theMsg.getOperationType()) { switch (theMsg.getOperationType()) {
case DELETE: case DELETE:
mySubscriptionRegistry.unregisterSubscription(theMsg.getId(myFhirContext)); mySubscriptionRegistry.unregisterSubscription(theMsg.getId(myFhirContext).getIdPart());
break; break;
case CREATE: case CREATE:
case UPDATE: case UPDATE:

View File

@ -20,10 +20,11 @@ package ca.uhn.fhir.jpa.subscription;
* #L% * #L%
*/ */
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.hl7.fhir.dstu2.model.Subscription; import org.hl7.fhir.dstu2.model.Subscription;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -47,6 +48,8 @@ public class SubscriptionInterceptorLoader {
@Autowired @Autowired
private SubscriptionRegistry mySubscriptionRegistry; private SubscriptionRegistry mySubscriptionRegistry;
@Autowired @Autowired
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
@Autowired
private ApplicationContext myApplicationContext; private ApplicationContext myApplicationContext;
@Autowired @Autowired
private IInterceptorService myInterceptorRegistry; private IInterceptorService myInterceptorRegistry;
@ -68,9 +71,11 @@ public class SubscriptionInterceptorLoader {
private void loadSubscriptions() { private void loadSubscriptions() {
ourLog.info("Loading subscriptions into the SubscriptionRegistry..."); ourLog.info("Loading subscriptions into the SubscriptionRegistry...");
// Activate scheduled subscription loads into the SubscriptionRegistry // Load active subscriptions into the SubscriptionRegistry and activate their channels
myApplicationContext.getBean(SubscriptionLoader.class); SubscriptionLoader loader = myApplicationContext.getBean(SubscriptionLoader.class);
loader.syncSubscriptions();
ourLog.info("...{} subscriptions loaded", mySubscriptionRegistry.size()); ourLog.info("...{} subscriptions loaded", mySubscriptionRegistry.size());
ourLog.info("...{} subscription channels started", mySubscriptionChannelRegistry.size());
} }
@VisibleForTesting @VisibleForTesting

View File

@ -4,7 +4,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.*; import ca.uhn.fhir.interceptor.api.*;
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster; import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster;

View File

@ -143,7 +143,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
jobDetails.setRemainingResourceIds(resourceIds.stream().map(t->t.getValue()).collect(Collectors.toList())); jobDetails.setRemainingResourceIds(resourceIds.stream().map(t->t.getValue()).collect(Collectors.toList()));
jobDetails.setRemainingSearchUrls(searchUrls.stream().map(t->t.getValue()).collect(Collectors.toList())); jobDetails.setRemainingSearchUrls(searchUrls.stream().map(t->t.getValue()).collect(Collectors.toList()));
if (theSubscriptionId != null) { if (theSubscriptionId != null) {
jobDetails.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue()); jobDetails.setSubscriptionId(theSubscriptionId.getIdPart());
} }
// Submit job for processing // Submit job for processing
@ -315,7 +315,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId); ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId);
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE); ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE);
msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue()); msg.setSubscriptionId(theSubscriptionId);
return myExecutorService.submit(() -> { return myExecutorService.submit(() -> {
for (int i = 0; ; i++) { for (int i = 0; ; i++) {

View File

@ -4,7 +4,8 @@ import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.IEmailSender; import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelWithHandlers;
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.JavaMailEmailSender; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.JavaMailEmailSender;
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.SubscriptionDeliveringEmailSubscriber; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.SubscriptionDeliveringEmailSubscriber;
import org.hl7.fhir.dstu2.model.Subscription; import org.hl7.fhir.dstu2.model.Subscription;
@ -24,6 +25,8 @@ public class SubscriptionTestUtil {
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor; private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
@Autowired @Autowired
private SubscriptionRegistry mySubscriptionRegistry; private SubscriptionRegistry mySubscriptionRegistry;
@Autowired
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
public int getExecutorQueueSize() { public int getExecutorQueueSize() {
LinkedBlockingQueueSubscribableChannel channel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest(); LinkedBlockingQueueSubscribableChannel channel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
@ -76,7 +79,8 @@ public class SubscriptionTestUtil {
public void setEmailSender(IIdType theIdElement) { public void setEmailSender(IIdType theIdElement) {
ActiveSubscription activeSubscription = mySubscriptionRegistry.get(theIdElement.getIdPart()); ActiveSubscription activeSubscription = mySubscriptionRegistry.get(theIdElement.getIdPart());
SubscriptionDeliveringEmailSubscriber subscriber = (SubscriptionDeliveringEmailSubscriber) activeSubscription.getDeliveryHandlerForUnitTest(); SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(activeSubscription.getChannelName());
SubscriptionDeliveringEmailSubscriber subscriber = (SubscriptionDeliveringEmailSubscriber) subscriptionChannelWithHandlers.getDeliveryHandlerForUnitTest();
subscriber.setEmailSender(myEmailSender); subscriber.setEmailSender(myEmailSender);
} }

View File

@ -399,7 +399,7 @@ public class InMemorySubscriptionMatcherR4Test {
subscription.setCriteriaString(criteria); subscription.setCriteriaString(criteria);
subscription.setIdElement(new IdType("Subscription", 123L)); subscription.setIdElement(new IdType("Subscription", 123L));
ResourceModifiedMessage msg = new ResourceModifiedMessage(myContext, patient, ResourceModifiedMessage.OperationTypeEnum.CREATE); ResourceModifiedMessage msg = new ResourceModifiedMessage(myContext, patient, ResourceModifiedMessage.OperationTypeEnum.CREATE);
msg.setSubscriptionId("Subscription/123"); msg.setSubscriptionId("123");
msg.setId(new IdType("Patient/ABC")); msg.setId(new IdType("Patient/ABC"));
InMemoryMatchResult result = myInMemorySubscriptionMatcher.match(subscription, msg); InMemoryMatchResult result = myInMemorySubscriptionMatcher.match(subscription, msg);
fail(); fail();

View File

@ -1,4 +1,5 @@
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
@ -118,6 +119,11 @@
<version>${project.version}</version> <version>${project.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
<pluginManagement> <pluginManagement>

View File

@ -20,65 +20,37 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
* #L% * #L%
*/ */
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import com.google.common.annotations.VisibleForTesting; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import java.io.Closeable; public class ActiveSubscription {
import java.util.Collection;
import java.util.HashSet;
public class ActiveSubscription implements Closeable {
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class); private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
private CanonicalSubscription mySubscription; private CanonicalSubscription mySubscription;
private final SubscribableChannel mySubscribableChannel; private final String myChannelName;
private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>(); private final String myId;
private boolean flagForDeletion; private boolean flagForDeletion;
public ActiveSubscription(CanonicalSubscription theSubscription, SubscribableChannel theSubscribableChannel) { public ActiveSubscription(CanonicalSubscription theSubscription, String theChannelName) {
mySubscription = theSubscription; mySubscription = theSubscription;
mySubscribableChannel = theSubscribableChannel; myChannelName = theChannelName;
myId = theSubscription.getIdPart();
} }
public CanonicalSubscription getSubscription() { public CanonicalSubscription getSubscription() {
return mySubscription; return mySubscription;
} }
public SubscribableChannel getSubscribableChannel() { public String getChannelName() {
return mySubscribableChannel; return myChannelName;
}
public void register(MessageHandler theHandler) {
mySubscribableChannel.subscribe(theHandler);
myDeliveryHandlerSet.add(theHandler);
}
public void unregister(MessageHandler theMessageHandler) {
if (mySubscribableChannel != null) {
mySubscribableChannel.unsubscribe(theMessageHandler);
}
}
public IIdType getIdElement(FhirContext theFhirContext) {
return mySubscription.getIdElement(theFhirContext);
} }
public String getCriteriaString() { public String getCriteriaString() {
return mySubscription.getCriteriaString(); return mySubscription.getCriteriaString();
} }
@VisibleForTesting
public MessageHandler getDeliveryHandlerForUnitTest() {
return myDeliveryHandlerSet.iterator().next();
}
public void setSubscription(CanonicalSubscription theCanonicalizedSubscription) { public void setSubscription(CanonicalSubscription theCanonicalizedSubscription) {
mySubscription = theCanonicalizedSubscription; mySubscription = theCanonicalizedSubscription;
} }
@ -91,26 +63,11 @@ public class ActiveSubscription implements Closeable {
flagForDeletion = theFlagForDeletion; flagForDeletion = theFlagForDeletion;
} }
@Override public String getId() {
public void close() { return myId;
for (MessageHandler messageHandler : myDeliveryHandlerSet) {
unregister(messageHandler);
}
if (mySubscribableChannel instanceof DisposableBean) {
try {
((DisposableBean) mySubscribableChannel).destroy();
} catch (Exception e) {
ourLog.error("Failed to destroy channel bean", e);
}
}
} }
/** public CanonicalSubscriptionChannelType getChannelType() {
* Use close() instead return mySubscription.getChannelType();
* KHS 15 Apr 2019
*/
@Deprecated
public void unregisterAll() {
close();
} }
} }

View File

@ -24,10 +24,7 @@ import org.apache.commons.lang3.Validate;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.*;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
class ActiveSubscriptionCache { class ActiveSubscriptionCache {
@ -47,23 +44,24 @@ class ActiveSubscriptionCache {
return myCache.size(); return myCache.size();
} }
public void put(String theSubscriptionId, ActiveSubscription theValue) { public void put(String theSubscriptionId, ActiveSubscription theActiveSubscription) {
myCache.put(theSubscriptionId, theValue); myCache.put(theSubscriptionId, theActiveSubscription);
} }
public synchronized void remove(String theSubscriptionId) { public synchronized ActiveSubscription remove(String theSubscriptionId) {
Validate.notBlank(theSubscriptionId); Validate.notBlank(theSubscriptionId);
ActiveSubscription activeSubscription = myCache.get(theSubscriptionId); ActiveSubscription activeSubscription = myCache.get(theSubscriptionId);
if (activeSubscription == null) { if (activeSubscription == null) {
return; return null;
} }
activeSubscription.close();
myCache.remove(theSubscriptionId); myCache.remove(theSubscriptionId);
return activeSubscription;
} }
public void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) { List<String> markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(Collection<String> theAllIds) {
List<String> retval = new ArrayList<>();
for (String next : new ArrayList<>(myCache.keySet())) { for (String next : new ArrayList<>(myCache.keySet())) {
ActiveSubscription activeSubscription = myCache.get(next); ActiveSubscription activeSubscription = myCache.get(next);
if (theAllIds.contains(next)) { if (theAllIds.contains(next)) {
@ -72,11 +70,12 @@ class ActiveSubscriptionCache {
} else { } else {
if (activeSubscription.isFlagForDeletion()) { if (activeSubscription.isFlagForDeletion()) {
ourLog.info("Unregistering Subscription/{}", next); ourLog.info("Unregistering Subscription/{}", next);
remove(next); retval.add(next);
} else { } else {
activeSubscription.setFlagForDeletion(true); activeSubscription.setFlagForDeletion(true);
} }
} }
} }
return retval;
} }
} }

View File

@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
*/ */
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.SubscribableChannel;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;

View File

@ -21,22 +21,24 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
*/ */
import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Subscription; import org.hl7.fhir.r4.model.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Optional; import java.util.Optional;
/** /**
@ -48,16 +50,14 @@ import java.util.Optional;
// TODO KHS Does jpa need a subscription registry if matching is disabled? // TODO KHS Does jpa need a subscription registry if matching is disabled?
@Component @Component
public class SubscriptionRegistry { public class SubscriptionRegistry {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionRegistry.class); private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class);
private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache(); private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
@Autowired @Autowired
SubscriptionCanonicalizer<IBaseResource> mySubscriptionCanonicalizer; private SubscriptionCanonicalizer<IBaseResource> mySubscriptionCanonicalizer;
@Autowired @Autowired
SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory; private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
@Autowired @Autowired
SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory; private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
@Autowired
ModelConfig myModelConfig;
@Autowired @Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster; private IInterceptorBroadcaster myInterceptorBroadcaster;
@ -91,21 +91,12 @@ public class SubscriptionRegistry {
Validate.notNull(theSubscription); Validate.notNull(theSubscription);
CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription); CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
SubscribableChannel deliveryChannel;
Optional<MessageHandler> deliveryHandler;
if (myModelConfig.isSubscriptionMatchingEnabled()) { String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(canonicalized);
deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(subscriptionId, canonicalized.getChannelType().toCode().toLowerCase());
deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(canonicalized);
} else {
deliveryChannel = null;
deliveryHandler = Optional.empty();
}
ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, deliveryChannel);
deliveryHandler.ifPresent(activeSubscription::register);
ourLog.info("Registering active subscription {}", subscriptionId);
ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, channelName);
mySubscriptionChannelRegistry.add(activeSubscription);
myActiveSubscriptionCache.put(subscriptionId, activeSubscription); myActiveSubscriptionCache.put(subscriptionId, activeSubscription);
// Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
@ -116,12 +107,14 @@ public class SubscriptionRegistry {
return canonicalized; return canonicalized;
} }
public void unregisterSubscription(IIdType theId) { public void unregisterSubscription(String theSubscriptionId) {
Validate.notNull(theId); Validate.notNull(theSubscriptionId);
String subscriptionId = theId.getIdPart();
ourLog.info("Unregistering active subscription {}", theId.toUnqualified().getValue()); ourLog.info("Unregistering active subscription {}", theSubscriptionId);
myActiveSubscriptionCache.remove(subscriptionId); ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId);
if (activeSubscription != null) {
mySubscriptionChannelRegistry.remove(activeSubscription);
}
} }
@PreDestroy @PreDestroy
@ -133,7 +126,11 @@ public class SubscriptionRegistry {
} }
void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) { void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {
myActiveSubscriptionCache.unregisterAllSubscriptionsNotInCollection(theAllIds);
List<String> idsToDelete = myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds);
for (String id : idsToDelete) {
unregisterSubscription(id);
}
} }
public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) { public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
@ -151,7 +148,7 @@ public class SubscriptionRegistry {
updateSubscription(theSubscription); updateSubscription(theSubscription);
return true; return true;
} }
unregisterSubscription(theSubscription.getIdElement()); unregisterSubscription(theSubscription.getIdElement().getIdPart());
} }
if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) { if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) {
registerSubscription(theSubscription.getIdElement(), theSubscription); registerSubscription(theSubscription.getIdElement(), theSubscription);
@ -183,7 +180,7 @@ public class SubscriptionRegistry {
public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) { public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) {
if (hasSubscription(theSubscription.getIdElement()).isPresent()) { if (hasSubscription(theSubscription.getIdElement()).isPresent()) {
ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue()); ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue());
unregisterSubscription(theSubscription.getIdElement()); unregisterSubscription(theSubscription.getIdElement().getIdPart());
return true; return true;
} }
return false; return false;

View File

@ -0,0 +1,7 @@
package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
public interface ISubscriptionDeliveryChannelNamer {
String nameFromSubscription(CanonicalSubscription theCanonicalSubscription);
}

View File

@ -0,0 +1,49 @@
package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
class SubscriptionChannelCache {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class);
private final Map<String, SubscriptionChannelWithHandlers> myCache = new ConcurrentHashMap<>();
public SubscriptionChannelWithHandlers get(String theChannelName) {
return myCache.get(theChannelName);
}
public int size() {
return myCache.size();
}
public void put(String theChannelName, SubscriptionChannelWithHandlers theValue) {
myCache.put(theChannelName, theValue);
}
synchronized void closeAndRemove(String theChannelName) {
Validate.notBlank(theChannelName);
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = myCache.get(theChannelName);
if (subscriptionChannelWithHandlers == null) {
return;
}
subscriptionChannelWithHandlers.close();
myCache.remove(theChannelName);
}
public boolean containsKey(String theChannelName) {
return myCache.containsKey(theChannelName);
}
void logForUnitTest() {
for (String key : myCache.keySet()) {
ourLog.info("SubscriptionChannelCache: {}", key);
}
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.module.cache; package ca.uhn.fhir.jpa.subscription.module.channel;
/*- /*-
* #%L * #%L
@ -34,12 +34,8 @@ public class SubscriptionChannelFactory {
mySubscribableChannelFactory = theSubscribableChannelFactory; mySubscribableChannelFactory = theSubscribableChannelFactory;
} }
public SubscribableChannel newDeliveryChannel(String theSubscriptionId, String theChannelType) { public SubscribableChannel newDeliveryChannel(String theChannelName) {
String channelName = "subscription-delivery-" + return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers());
theChannelType +
"-" +
theSubscriptionId;
return mySubscribableChannelFactory.createSubscribableChannel(channelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers());
} }
public SubscribableChannel newMatchingChannel(String theChannelName) { public SubscribableChannel newMatchingChannel(String theChannelName) {

View File

@ -0,0 +1,99 @@
package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Optional;
@Component
public class SubscriptionChannelRegistry {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class);
private final SubscriptionChannelCache mySubscriptionChannelCache = new SubscriptionChannelCache();
// This map is a reference count so we know to destroy the channel when there are no more active subscriptions using it
// Key Channel Name, Value Subscription Id
private final Multimap<String, String> myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build();
@Autowired
private SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
@Autowired
private SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
@Autowired
private ModelConfig myModelConfig;
public synchronized void add(ActiveSubscription theActiveSubscription) {
if (!myModelConfig.isSubscriptionMatchingEnabled()) {
return;
}
String channelName = theActiveSubscription.getChannelName();
ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName);
myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId());
if (mySubscriptionChannelCache.containsKey(channelName)) {
ourLog.info("Channel {} already exists. Not creating.", channelName);
return;
}
SubscribableChannel deliveryChannel;
Optional<MessageHandler> deliveryHandler;
deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName);
deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType());
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel);
deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler);
mySubscriptionChannelCache.put(channelName, subscriptionChannelWithHandlers);
}
public synchronized void remove(ActiveSubscription theActiveSubscription) {
if (!myModelConfig.isSubscriptionMatchingEnabled()) {
return;
}
String channelName = theActiveSubscription.getChannelName();
ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId() ,channelName);
boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
if (!removed) {
ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId() ,channelName);
}
// This was the last one. Close and remove the channel
if (!myActiveSubscriptionByChannelName.containsKey(channelName)) {
SubscriptionChannelWithHandlers channel = mySubscriptionChannelCache.get(channelName);
if (channel != null) {
channel.close();
}
mySubscriptionChannelCache.closeAndRemove(channelName);
}
}
public synchronized SubscriptionChannelWithHandlers get(String theChannelName) {
return mySubscriptionChannelCache.get(theChannelName);
}
public synchronized int size() {
return mySubscriptionChannelCache.size();
}
@VisibleForTesting
public void logForUnitTest() {
ourLog.info("{} Channels: {}", this, size());
mySubscriptionChannelCache.logForUnitTest();
for (String key : myActiveSubscriptionByChannelName.keySet()) {
Collection<String> list = myActiveSubscriptionByChannelName.get(key);
for (String value : list) {
ourLog.info("ActiveSubscriptionByChannelName {}: {}", key, value);
}
}
}
}

View File

@ -0,0 +1,66 @@
package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import java.io.Closeable;
import java.util.Collection;
import java.util.HashSet;
public class SubscriptionChannelWithHandlers implements Closeable {
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
private final String myChannelName;
private final SubscribableChannel mySubscribableChannel;
private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>();
public SubscriptionChannelWithHandlers(String theChannelName, SubscribableChannel theSubscribableChannel) {
myChannelName = theChannelName;
mySubscribableChannel = theSubscribableChannel;
}
public void addHandler(MessageHandler theHandler) {
mySubscribableChannel.subscribe(theHandler);
myDeliveryHandlerSet.add(theHandler);
}
public void removeHandler(MessageHandler theMessageHandler) {
if (mySubscribableChannel != null) {
mySubscribableChannel.unsubscribe(theMessageHandler);
}
}
@VisibleForTesting
public MessageHandler getDeliveryHandlerForUnitTest() {
return myDeliveryHandlerSet.iterator().next();
}
@Override
public void close() {
for (MessageHandler messageHandler : myDeliveryHandlerSet) {
removeHandler(messageHandler);
}
if (mySubscribableChannel instanceof DisposableBean) {
tryDestroyChannel((DisposableBean) mySubscribableChannel);
}
}
private void tryDestroyChannel(DisposableBean theSubscribableChannel) {
try {
ourLog.info("Destroying channel {}", myChannelName);
theSubscribableChannel.destroy();
} catch (Exception e) {
ourLog.error("Failed to destroy channel bean", e);
}
}
public MessageChannel getChannel() {
return mySubscribableChannel;
}
}

View File

@ -0,0 +1,17 @@
package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import org.springframework.stereotype.Service;
@Service
public class SubscriptionDeliveryChannelNamer implements ISubscriptionDeliveryChannelNamer {
@Override
public String nameFromSubscription(CanonicalSubscription theCanonicalSubscription) {
String channelType = theCanonicalSubscription.getChannelType().toCode().toLowerCase();
String subscriptionId = theCanonicalSubscription.getIdPart();
return "subscription-delivery-" +
channelType +
"-" +
subscriptionId;
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.module.cache; package ca.uhn.fhir.jpa.subscription.module.channel;
/*- /*-
* #%L * #%L
@ -20,16 +20,13 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
* #L% * #L%
*/ */
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionDeliveringRestHookSubscriber; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.IEmailSender; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.IEmailSender;
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.SubscriptionDeliveringEmailSubscriber; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.SubscriptionDeliveringEmailSubscriber;
import org.hl7.fhir.r4.model.Subscription;
import org.springframework.beans.factory.annotation.Lookup; import org.springframework.beans.factory.annotation.Lookup;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thymeleaf.util.Validate;
import java.util.Optional; import java.util.Optional;
@ -42,10 +39,10 @@ public abstract class SubscriptionDeliveryHandlerFactory {
@Lookup @Lookup
protected abstract SubscriptionDeliveringRestHookSubscriber getSubscriptionDeliveringRestHookSubscriber(); protected abstract SubscriptionDeliveringRestHookSubscriber getSubscriptionDeliveringRestHookSubscriber();
public Optional<MessageHandler> createDeliveryHandler(CanonicalSubscription theSubscription) { public Optional<MessageHandler> createDeliveryHandler(CanonicalSubscriptionChannelType theChannelType) {
if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) { if (theChannelType == CanonicalSubscriptionChannelType.EMAIL) {
return Optional.of(getSubscriptionDeliveringEmailSubscriber(myEmailSender)); return Optional.of(getSubscriptionDeliveringEmailSubscriber(myEmailSender));
} else if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) { } else if (theChannelType == CanonicalSubscriptionChannelType.RESTHOOK) {
return Optional.of(getSubscriptionDeliveringRestHookSubscriber()); return Optional.of(getSubscriptionDeliveringRestHookSubscriber());
} else { } else {
return Optional.empty(); return Optional.empty();

View File

@ -21,8 +21,8 @@ package ca.uhn.fhir.jpa.subscription.module.config;
*/ */
import ca.uhn.fhir.interceptor.executor.InterceptorService; import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;

View File

@ -65,7 +65,7 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler {
switch (theResourceModifiedMessage.getOperationType()) { switch (theResourceModifiedMessage.getOperationType()) {
case DELETE: case DELETE:
if (isSubscription(theResourceModifiedMessage)) { if (isSubscription(theResourceModifiedMessage)) {
mySubscriptionRegistry.unregisterSubscription(theResourceModifiedMessage.getId(myFhirContext)); mySubscriptionRegistry.unregisterSubscription(theResourceModifiedMessage.getId(myFhirContext).getIdPart());
} }
return; return;
case CREATE: case CREATE:

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.api.EncodingEnum;
@ -61,6 +62,8 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
private SubscriptionRegistry mySubscriptionRegistry; private SubscriptionRegistry mySubscriptionRegistry;
@Autowired @Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster; private IInterceptorBroadcaster myInterceptorBroadcaster;
@Autowired
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
@Override @Override
public void handleMessage(Message<?> theMessage) throws MessagingException { public void handleMessage(Message<?> theMessage) throws MessagingException {
@ -119,6 +122,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
if (isNotBlank(theMsg.getSubscriptionId())) { if (isNotBlank(theMsg.getSubscriptionId())) {
if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) { if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) {
// TODO KHS we should use a hash to look it up instead of this full table scan
ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId()); ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId());
continue; continue;
} }
@ -133,7 +137,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
continue; continue;
} }
ourLog.debug("Subscription {} was matched by resource {} {}", ourLog.debug("Subscription {} was matched by resource {} {}",
nextActiveSubscription.getSubscription().getIdElement(myFhirContext).getValue(), nextActiveSubscription.getId(),
resourceId.toUnqualifiedVersionless().getValue(), resourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository"); matchResult.isInMemory() ? "in-memory" : "by querying the repository");
@ -177,12 +181,12 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) { private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) {
boolean retval = false; boolean retval = false;
ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg); ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg);
MessageChannel deliveryChannel = nextActiveSubscription.getSubscribableChannel(); MessageChannel deliveryChannel = mySubscriptionChannelRegistry.get(nextActiveSubscription.getChannelName()).getChannel();
if (deliveryChannel != null) { if (deliveryChannel != null) {
retval = true; retval = true;
trySendToDeliveryChannel(wrappedMsg, deliveryChannel); trySendToDeliveryChannel(wrappedMsg, deliveryChannel);
} else { } else {
ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getIdElement(myFhirContext)); ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getId());
} }
return retval; return retval;
} }
@ -200,7 +204,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
} }
private String getId(ActiveSubscription theActiveSubscription) { private String getId(ActiveSubscription theActiveSubscription) {
return theActiveSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue(); return theActiveSubscription.getId();
} }
private boolean validCriteria(ActiveSubscription theActiveSubscription, IIdType theResourceId) { private boolean validCriteria(ActiveSubscription theActiveSubscription, IIdType theResourceId) {

View File

@ -22,6 +22,8 @@ package ca.uhn.fhir.jpa.subscription.module.subscriber.websocket;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelWithHandlers;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.IdType;
@ -45,6 +47,8 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
private static Logger ourLog = LoggerFactory.getLogger(SubscriptionWebsocketHandler.class); private static Logger ourLog = LoggerFactory.getLogger(SubscriptionWebsocketHandler.class);
@Autowired @Autowired
protected WebsocketConnectionValidator myWebsocketConnectionValidator; protected WebsocketConnectionValidator myWebsocketConnectionValidator;
@Autowired
SubscriptionChannelRegistry mySubscriptionChannelRegistry;
@Autowired @Autowired
private FhirContext myCtx; private FhirContext myCtx;
@ -102,26 +106,28 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
} }
private class BoundStaticSubscipriptionState implements IState, MessageHandler { private class BoundStaticSubscriptionState implements IState, MessageHandler {
private final WebSocketSession mySession; private final WebSocketSession mySession;
private final ActiveSubscription myActiveSubscription; private final ActiveSubscription myActiveSubscription;
public BoundStaticSubscipriptionState(WebSocketSession theSession, ActiveSubscription theActiveSubscription) { public BoundStaticSubscriptionState(WebSocketSession theSession, ActiveSubscription theActiveSubscription) {
mySession = theSession; mySession = theSession;
myActiveSubscription = theActiveSubscription; myActiveSubscription = theActiveSubscription;
theActiveSubscription.register(this); SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(theActiveSubscription.getChannelName());
subscriptionChannelWithHandlers.addHandler(this);
} }
@Override @Override
public void closing() { public void closing() {
myActiveSubscription.unregister(this); SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(myActiveSubscription.getChannelName());
subscriptionChannelWithHandlers.removeHandler(this);
} }
private void deliver() { private void deliver() {
try { try {
String payload = "ping " + myActiveSubscription.getIdElement(myCtx).getIdPart(); String payload = "ping " + myActiveSubscription.getId();
ourLog.info("Sending WebSocket message: {}", payload); ourLog.info("Sending WebSocket message: {}", payload);
mySession.sendMessage(new TextMessage(payload)); mySession.sendMessage(new TextMessage(payload));
} catch (IOException e) { } catch (IOException e) {
@ -153,7 +159,6 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
handleFailure(e); handleFailure(e);
} }
} }
} }
private class InitialState implements IState { private class InitialState implements IState {
@ -172,7 +177,7 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
return null; return null;
} }
myState = new BoundStaticSubscipriptionState(theSession, response.getActiveSubscription()); myState = new BoundStaticSubscriptionState(theSession, response.getActiveSubscription());
return id; return id;
} }
@ -206,94 +211,3 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
} }
} }
// private IIdType bingSearch(WebSocketSession theSession, String theRemaining) {
// Subscription subscription = new Subscription();
// subscription.getChannel().setType(SubscriptionChannelType.WEBSOCKET);
// subscription.setStatus(SubscriptionStatus.ACTIVE);
// subscription.setCriteria(theRemaining);
//
// try {
// String params = theRemaining.substring(theRemaining.indexOf('?')+1);
// List<NameValuePair> paramValues = URLEncodedUtils.parse(params, Constants.CHARSET_UTF8, '&');
// EncodingEnum encoding = EncodingEnum.JSON;
// for (NameValuePair nameValuePair : paramValues) {
// if (Constants.PARAM_FORMAT.equals(nameValuePair.getName())) {
// EncodingEnum nextEncoding = EncodingEnum.forContentType(nameValuePair.getValue());
// if (nextEncoding != null) {
// encoding = nextEncoding;
// }
// }
// }
//
// IIdType id = ourSubscriptionDao.create(subscription).getId();
//
// mySubscriptionPid = ourSubscriptionDao.getSubscriptionTablePidForSubscriptionResource(id);
// mySubscriptionId = subscription.getIdElement();
// myState = new BoundDynamicSubscriptionState(theSession, encoding);
//
// return id;
// } catch (UnprocessableEntityException e) {
// ourLog.warn("Failed to bind subscription: " + e.getMessage());
// try {
// theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - " + e.getMessage()));
// } catch (IOException e2) {
// handleFailure(e2);
// }
// } catch (Exception e) {
// handleFailure(e);
// try {
// theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - No ID included"));
// } catch (IOException e2) {
// handleFailure(e2);
// }
// }
// return null;
// }
//private class BoundDynamicSubscriptionState implements SubscriptionWebsocketHandler.IState {
//
// private EncodingEnum myEncoding;
// private WebSocketSession mySession;
//
// public BoundDynamicSubscriptionState(WebSocketSession theSession, EncodingEnum theEncoding) {
// mySession = theSession;
// myEncoding = theEncoding;
// }
//
// @Override
// public void closing() {
// ourLog.info("Deleting subscription {}", mySubscriptionId);
// try {
// ourSubscriptionDao.delete(mySubscriptionId, null);
// } catch (Exception e) {
// handleFailure(e);
// }
// }
//
// @Override
// public void deliver(List<IBaseResource> theResults) {
// try {
// for (IBaseResource nextResource : theResults) {
// ourLog.info("Sending WebSocket message for resource: {}", nextResource.getIdElement());
// String encoded = myEncoding.newParser(ourCtx).encodeResourceToString(nextResource);
// String payload = "add " + mySubscriptionId.getIdPart() + '\n' + encoded;
// mySession.sendMessage(new TextMessage(payload));
// }
// } catch (IOException e) {
// handleFailure(e);
// }
// }
//
// @Override
// public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
// try {
// theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload()));
// } catch (IOException e) {
// handleFailure(e);
// }
// }
//
//}

View File

@ -1,18 +1,27 @@
package ca.uhn.fhir.jpa.subscription.module; package ca.uhn.fhir.jpa.subscription.module;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.module.config.TestSubscriptionDstu3Config; import ca.uhn.fhir.jpa.subscription.module.config.TestSubscriptionDstu3Config;
import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.StopWatch;
import org.hl7.fhir.dstu3.model.Subscription; import org.hl7.fhir.dstu3.model.Subscription;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.ContextConfiguration;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ContextConfiguration(classes = {TestSubscriptionDstu3Config.class}) @ContextConfiguration(classes = {TestSubscriptionDstu3Config.class})
public abstract class BaseSubscriptionDstu3Test extends BaseSubscriptionTest { public abstract class BaseSubscriptionDstu3Test extends BaseSubscriptionTest {
@Autowired
protected SubscriptionRegistry mySubscriptionRegistry;
@Autowired
protected SubscriptionChannelRegistry mySubscriptionChannelRegistry;
private final SubscriptionTestHelper mySubscriptionTestHelper = new SubscriptionTestHelper(); private final SubscriptionTestHelper mySubscriptionTestHelper = new SubscriptionTestHelper();
@ -53,4 +62,22 @@ public abstract class BaseSubscriptionDstu3Test extends BaseSubscriptionTest {
protected Subscription makeSubscriptionWithStatus(String theCriteria, String thePayload, String theEndpoint, Subscription.SubscriptionStatus status) { protected Subscription makeSubscriptionWithStatus(String theCriteria, String thePayload, String theEndpoint, Subscription.SubscriptionStatus status) {
return mySubscriptionTestHelper.makeSubscriptionWithStatus(theCriteria, thePayload, theEndpoint, status); return mySubscriptionTestHelper.makeSubscriptionWithStatus(theCriteria, thePayload, theEndpoint, status);
} }
protected void clearRegistry() {
mySubscriptionRegistry.unregisterAllSubscriptions();
await().until(this::registryEmpty);
}
private boolean registryEmpty() {
return mySubscriptionRegistry.size() == 0 && mySubscriptionChannelRegistry.size() == 0;
}
protected void assertRegistrySize(int theSize) {
assertRegistrySize(theSize, theSize);
}
protected void assertRegistrySize(int theSubscriptionRegistrySize, int theSubscriptionChannelRegistrySize) {
assertEquals(theSubscriptionRegistrySize, mySubscriptionRegistry.size());
assertEquals(theSubscriptionChannelRegistrySize, mySubscriptionChannelRegistry.size());
}
} }

View File

@ -1,90 +1,101 @@
package ca.uhn.fhir.jpa.subscription.module.cache; package ca.uhn.fhir.jpa.subscription.module.cache;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.model.primitive.IdDt;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class ActiveSubscriptionCacheTest { public class ActiveSubscriptionCacheTest {
static final String ID1 = "id1";
static final String ID2 = "id2";
@Test @Test
public void twoPhaseDelete() { public void twoPhaseDelete() {
ActiveSubscriptionCache activeSubscriptionCache = new ActiveSubscriptionCache(); ActiveSubscriptionCache activeSubscriptionCache = new ActiveSubscriptionCache();
ActiveSubscription activeSub1 = new ActiveSubscription(null, null); ActiveSubscription activeSub1 = buildActiveSubscription(ID1);
String id1 = "id1"; activeSubscriptionCache.put(ID1, activeSub1);
activeSubscriptionCache.put(id1, activeSub1);
assertFalse(activeSub1.isFlagForDeletion()); assertFalse(activeSub1.isFlagForDeletion());
List<String> saveIds = new ArrayList<>(); List<String> saveIds = new ArrayList<>();
activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); List<String> idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds);
assertTrue(activeSub1.isFlagForDeletion()); assertTrue(activeSub1.isFlagForDeletion());
assertNotNull(activeSubscriptionCache.get(id1)); assertNotNull(activeSubscriptionCache.get(ID1));
assertEquals(0, idsToDelete.size());
activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds);
assertNull(activeSubscriptionCache.get(id1)); assertThat(idsToDelete, containsInAnyOrder(ID1));
}
private ActiveSubscription buildActiveSubscription(String theId) {
CanonicalSubscription canonicalSubscription = new CanonicalSubscription();
canonicalSubscription.setIdElement(new IdDt(theId));
return new ActiveSubscription(canonicalSubscription, null);
} }
@Test @Test
public void secondPassUnflags() { public void secondPassUnflags() {
ActiveSubscriptionCache activeSubscriptionCache = new ActiveSubscriptionCache(); ActiveSubscriptionCache activeSubscriptionCache = new ActiveSubscriptionCache();
ActiveSubscription activeSub1 = new ActiveSubscription(null, null); ActiveSubscription activeSub1 = buildActiveSubscription(ID1);
String id1 = "id1";
List<String> saveIds = new ArrayList<>(); List<String> saveIds = new ArrayList<>();
activeSubscriptionCache.put(id1, activeSub1); activeSubscriptionCache.put(ID1, activeSub1);
assertFalse(activeSub1.isFlagForDeletion()); assertFalse(activeSub1.isFlagForDeletion());
activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); List<String> idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds);
assertTrue(activeSub1.isFlagForDeletion()); assertTrue(activeSub1.isFlagForDeletion());
assertNotNull(activeSubscriptionCache.get(id1)); assertNotNull(activeSubscriptionCache.get(ID1));
assertEquals(0, idsToDelete.size());
saveIds.add(id1); saveIds.add(ID1);
activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds);
assertFalse(activeSub1.isFlagForDeletion()); assertFalse(activeSub1.isFlagForDeletion());
assertNotNull(activeSubscriptionCache.get(id1)); assertEquals(0, idsToDelete.size());
} }
@Test @Test
public void onlyFlaggedDeleted() { public void onlyFlaggedDeleted() {
ActiveSubscriptionCache activeSubscriptionCache = new ActiveSubscriptionCache(); ActiveSubscriptionCache activeSubscriptionCache = new ActiveSubscriptionCache();
ActiveSubscription activeSub1 = new ActiveSubscription(null, null);
String id1 = "id1"; ActiveSubscription activeSub1 = buildActiveSubscription(ID1);
ActiveSubscription activeSub2 = new ActiveSubscription(null, null); ActiveSubscription activeSub2 = buildActiveSubscription(ID2);
String id2 = "id2"; activeSubscriptionCache.put(activeSub1.getId(), activeSub1);
activeSubscriptionCache.put(id1, activeSub1); activeSubscriptionCache.put(activeSub2.getId(), activeSub2);
activeSubscriptionCache.put(id2, activeSub2);
activeSub1.setFlagForDeletion(true); activeSub1.setFlagForDeletion(true);
List<String> saveIds = new ArrayList<>(); List<String> saveIds = new ArrayList<>();
activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); List<String> idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds);
assertNull(activeSubscriptionCache.get(id1)); assertThat(idsToDelete, containsInAnyOrder(ID1));
assertNotNull(activeSubscriptionCache.get(id2)); assertNotNull(activeSubscriptionCache.get(ID2));
assertTrue(activeSub2.isFlagForDeletion()); assertTrue(activeSub2.isFlagForDeletion());
} }
@Test @Test
public void onListSavesAndUnmarksFlag() { public void onListSavesAndUnmarksFlag() {
ActiveSubscriptionCache activeSubscriptionCache = new ActiveSubscriptionCache(); ActiveSubscriptionCache activeSubscriptionCache = new ActiveSubscriptionCache();
ActiveSubscription activeSub1 = new ActiveSubscription(null, null); ActiveSubscription activeSub1 = buildActiveSubscription(ID1);
String id1 = "id1";
ActiveSubscription activeSub2 = new ActiveSubscription(null, null); ActiveSubscription activeSub2 = buildActiveSubscription(ID2);
String id2 = "id2";
activeSubscriptionCache.put(id1, activeSub1); activeSubscriptionCache.put(ID1, activeSub1);
activeSubscriptionCache.put(id2, activeSub2); activeSubscriptionCache.put(ID2, activeSub2);
activeSub1.setFlagForDeletion(true); activeSub1.setFlagForDeletion(true);
List<String> saveIds = new ArrayList<>(); List<String> saveIds = new ArrayList<>();
saveIds.add(id1); saveIds.add(ID1);
saveIds.add(id2); saveIds.add(ID2);
activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds);
assertNotNull(activeSubscriptionCache.get(id1)); assertNotNull(activeSubscriptionCache.get(ID1));
assertFalse(activeSub1.isFlagForDeletion()); assertFalse(activeSub1.isFlagForDeletion());
assertNotNull(activeSubscriptionCache.get(id2)); assertNotNull(activeSubscriptionCache.get(ID2));
assertFalse(activeSub2.isFlagForDeletion()); assertFalse(activeSub2.isFlagForDeletion());
} }

View File

@ -0,0 +1,33 @@
package ca.uhn.fhir.jpa.subscription.module.cache;
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.After;
public abstract class BaseSubscriptionRegistryTest extends BaseSubscriptionDstu3Test {
public static final String SUBSCRIPTION_ID = "1";
public static final String ORIG_CRITERIA = "Patient?";
public static final String NEW_CRITERIA = "Observation?";
@After
public void clearRegistryAfter() {
super.clearRegistry();
}
protected Subscription createSubscription() {
Subscription subscription = new Subscription();
subscription.setId(SUBSCRIPTION_ID);
subscription.setCriteria(ORIG_CRITERIA);
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
setChannel(subscription, Subscription.SubscriptionChannelType.RESTHOOK);
return subscription;
}
protected void setChannel(Subscription theSubscription, Subscription.SubscriptionChannelType theResthook) {
Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent();
channel.setType(theResthook);
channel.setPayload("application/json");
channel.setEndpoint("http://unused.test.endpoint/");
theSubscription.setChannel(channel);
}
}

View File

@ -0,0 +1,44 @@
package ca.uhn.fhir.jpa.subscription.module.cache;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer;
import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.Test;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.test.annotation.DirtiesContext;
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class SubscriptionRegistrySharedTest extends BaseSubscriptionRegistryTest {
private static final String OTHER_ID = "OTHER_ID";
@Configuration
public static class SpringConfig {
@Primary
@Bean
ISubscriptionDeliveryChannelNamer subscriptionDeliveryChannelNamer() {
return new SharedNamer();
}
private class SharedNamer implements ISubscriptionDeliveryChannelNamer {
@Override
public String nameFromSubscription(CanonicalSubscription theCanonicalSubscription) {
return "shared";
}
}
}
@Test
public void testTwoSubscriptionsOneChannel() {
Subscription subscription = createSubscription();
assertRegistrySize(0);
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
assertRegistrySize(1);
Subscription otherSubscription = createSubscription();
otherSubscription.setId(OTHER_ID);
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(otherSubscription);
assertRegistrySize(2, 1);
}
}

View File

@ -1,45 +1,24 @@
package ca.uhn.fhir.jpa.subscription.module.cache; package ca.uhn.fhir.jpa.subscription.module.cache;
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import org.hl7.fhir.dstu3.model.Subscription; import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class SubscriptionRegistryTest extends BaseSubscriptionDstu3Test { public class SubscriptionRegistryTest extends BaseSubscriptionRegistryTest {
public static final String SUBSCRIPTION_ID = "1";
public static final String ORIG_CRITERIA = "Patient?";
public static final String NEW_CRITERIA = "Observation?";
@Autowired
SubscriptionRegistry mySubscriptionRegistry;
@Before
public void clearRegistryBefore() {
mySubscriptionRegistry.unregisterAllSubscriptions();
}
@After
public void clearRegistryAfter() {
mySubscriptionRegistry.unregisterAllSubscriptions();
}
@Test @Test
public void updateSubscriptionReusesActiveSubscription() { public void updateSubscriptionReusesActiveSubscription() {
Subscription subscription = createSubscription(); Subscription subscription = createSubscription();
assertEquals(0, mySubscriptionRegistry.size()); assertRegistrySize(0);
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
assertEquals(1, mySubscriptionRegistry.size()); assertRegistrySize(1);
ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID); ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString()); assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
subscription.setCriteria(NEW_CRITERIA); subscription.setCriteria(NEW_CRITERIA);
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString()); assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
assertEquals(1, mySubscriptionRegistry.size()); assertRegistrySize(1);
ActiveSubscription newActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID); ActiveSubscription newActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
assertEquals(NEW_CRITERIA, newActiveSubscription.getCriteriaString()); assertEquals(NEW_CRITERIA, newActiveSubscription.getCriteriaString());
// The same object // The same object
@ -49,9 +28,10 @@ public class SubscriptionRegistryTest extends BaseSubscriptionDstu3Test {
@Test @Test
public void updateSubscriptionDoesntReusesActiveSubscriptionWhenChannelChanges() { public void updateSubscriptionDoesntReusesActiveSubscriptionWhenChannelChanges() {
Subscription subscription = createSubscription(); Subscription subscription = createSubscription();
assertEquals(0, mySubscriptionRegistry.size()); assertRegistrySize(0);
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
assertEquals(1, mySubscriptionRegistry.size()); assertRegistrySize(1);
ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID); ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString()); assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
@ -59,26 +39,21 @@ public class SubscriptionRegistryTest extends BaseSubscriptionDstu3Test {
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString()); assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
assertEquals(1, mySubscriptionRegistry.size()); assertRegistrySize(1);
ActiveSubscription newActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID); ActiveSubscription newActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
// A new object // A new object
assertFalse(newActiveSubscription == origActiveSubscription); assertFalse(newActiveSubscription == origActiveSubscription);
} }
private Subscription createSubscription() { @Test
Subscription subscription = new Subscription(); public void updateRemove() {
subscription.setId(SUBSCRIPTION_ID); Subscription subscription = createSubscription();
subscription.setCriteria(ORIG_CRITERIA); assertRegistrySize(0);
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE); mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
setChannel(subscription, Subscription.SubscriptionChannelType.RESTHOOK); assertRegistrySize(1);
return subscription; mySubscriptionRegistry.unregisterSubscription(subscription.getId());
assertRegistrySize(0);
} }
private void setChannel(Subscription theSubscription, Subscription.SubscriptionChannelType theResthook) {
Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent();
channel.setType(theResthook);
channel.setPayload("application/json");
channel.setEndpoint("http://unused.test.endpoint/");
theSubscription.setChannel(channel);
}
} }

View File

@ -0,0 +1,60 @@
package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import ca.uhn.fhir.model.primitive.IdDt;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.when;
@RunWith(SpringRunner.class)
public class SubscriptionChannelRegistryTest {
private static final String TEST_CHANNEL_NAME = "TEST_CHANNEL";
@Autowired
SubscriptionChannelRegistry mySubscriptionChannelRegistry;
@MockBean
SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
@MockBean
SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
@MockBean
ModelConfig myModelConfig;
@Configuration
static class SpringConfig {
@Bean
SubscriptionChannelRegistry subscriptionChannelRegistry() {
return new SubscriptionChannelRegistry();
}
}
@Test
public void testAddAddRemoveRemove() {
when(myModelConfig.isSubscriptionMatchingEnabled()).thenReturn(true);
CanonicalSubscription cansubA = new CanonicalSubscription();
cansubA.setIdElement(new IdDt("A"));
ActiveSubscription activeSubscriptionA = new ActiveSubscription(cansubA, TEST_CHANNEL_NAME);
CanonicalSubscription cansubB = new CanonicalSubscription();
cansubB.setIdElement(new IdDt("B"));
ActiveSubscription activeSubscriptionB = new ActiveSubscription(cansubB, TEST_CHANNEL_NAME);
assertNull(mySubscriptionChannelRegistry.get(TEST_CHANNEL_NAME));
mySubscriptionChannelRegistry.add(activeSubscriptionA);
assertNotNull(mySubscriptionChannelRegistry.get(TEST_CHANNEL_NAME));
mySubscriptionChannelRegistry.add(activeSubscriptionB);
mySubscriptionChannelRegistry.remove(activeSubscriptionB);
assertNotNull(mySubscriptionChannelRegistry.get(TEST_CHANNEL_NAME));
mySubscriptionChannelRegistry.remove(activeSubscriptionA);
assertNull(mySubscriptionChannelRegistry.get(TEST_CHANNEL_NAME));
}
}

View File

@ -7,13 +7,17 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.test.concurrency.IPointcutLatch; import ca.uhn.test.concurrency.IPointcutLatch;
import ca.uhn.test.concurrency.PointcutLatch; import ca.uhn.test.concurrency.PointcutLatch;
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test; import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider; import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.annotation.Create; import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam; import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update; import ca.uhn.fhir.rest.annotation.Update;
@ -61,6 +65,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider; private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider;
@Autowired @Autowired
private SubscriptionLoader mySubscriptionLoader; private SubscriptionLoader mySubscriptionLoader;
@Autowired
private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
protected String myCode = "1000000050"; protected String myCode = "1000000050";
@ -80,8 +86,11 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
ourCreatedObservations.clear(); ourCreatedObservations.clear();
ourUpdatedObservations.clear(); ourUpdatedObservations.clear();
ourContentTypes.clear(); ourContentTypes.clear();
CanonicalSubscription canonicalSubscription = new CanonicalSubscription();
canonicalSubscription.setIdElement(new IdDt("test"));
canonicalSubscription.setChannelType(CanonicalSubscriptionChannelType.RESTHOOK);
mySubscriptionRegistry.unregisterAllSubscriptions(); mySubscriptionRegistry.unregisterAllSubscriptions();
ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase()); ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel(mySubscriptionDeliveryChannelNamer.nameFromSubscription(canonicalSubscription));
ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler); ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost);
@ -93,6 +102,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
mySubscriptionMatchingPost.clear(); mySubscriptionMatchingPost.clear();
mySubscriptionActivatedPost.clear(); mySubscriptionActivatedPost.clear();
ourObservationListener.clear(); ourObservationListener.clear();
super.clearRegistry();
} }
public <T extends IBaseResource> T sendResource(T theResource) throws InterruptedException { public <T extends IBaseResource> T sendResource(T theResource) throws InterruptedException {

View File

@ -16,7 +16,7 @@ import java.util.Collections;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
public class SearchParamLoaderTest extends BaseBlockingQueueSubscribableChannelDstu3Test { public class SearchParamLoaderTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
private static final int MOCK_FHIR_CLIENT_FAILURES = 5; private static final int MOCK_FHIR_CLIENT_FAILURES = 3;
@Autowired @Autowired
private MockFhirClientSearchParamProvider myMockFhirClientSearchParamProvider; private MockFhirClientSearchParamProvider myMockFhirClientSearchParamProvider;
@Autowired @Autowired

View File

@ -1,9 +1,6 @@
package ca.uhn.fhir.jpa.subscription.module.standalone; package ca.uhn.fhir.jpa.subscription.module.standalone;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider; import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import org.hl7.fhir.dstu3.model.Subscription; import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -16,7 +13,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannelDstu3Test { public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
private static final int MOCK_FHIR_CLIENT_FAILURES = 5; private static final int MOCK_FHIR_CLIENT_FAILURES = 3;
@Autowired @Autowired
private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider; private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider;

View File

@ -12,7 +12,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.*; import static org.junit.Assert.*;