Separated ChannelRegistry from ActiveSubscriptionRegistry so we can support a many-to-one relationship there

All tests pass
This commit is contained in:
Ken Stevens 2019-09-30 15:01:47 -04:00
parent a9699c3cf5
commit fd8b5206e7
21 changed files with 262 additions and 128 deletions

View File

@ -28,7 +28,7 @@ import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl; import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl;
import ca.uhn.fhir.jpa.subscription.dbmatcher.CompositeInMemoryDaoSubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.dbmatcher.CompositeInMemoryDaoSubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.dbmatcher.DaoSubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.dbmatcher.DaoSubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.module.matcher.InMemorySubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.module.matcher.InMemorySubscriptionMatcher;

View File

@ -4,8 +4,8 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.*; import ca.uhn.fhir.interceptor.api.*;
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster; import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster;

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.IEmailSender; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.IEmailSender;
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.JavaMailEmailSender; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.JavaMailEmailSender;
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.SubscriptionDeliveringEmailSubscriber; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.SubscriptionDeliveringEmailSubscriber;
@ -24,6 +25,8 @@ public class SubscriptionTestUtil {
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor; private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
@Autowired @Autowired
private SubscriptionRegistry mySubscriptionRegistry; private SubscriptionRegistry mySubscriptionRegistry;
@Autowired
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
public int getExecutorQueueSize() { public int getExecutorQueueSize() {
LinkedBlockingQueueSubscribableChannel channel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest(); LinkedBlockingQueueSubscribableChannel channel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
@ -76,7 +79,7 @@ public class SubscriptionTestUtil {
public void setEmailSender(IIdType theIdElement) { public void setEmailSender(IIdType theIdElement) {
ActiveSubscription activeSubscription = mySubscriptionRegistry.get(theIdElement.getIdPart()); ActiveSubscription activeSubscription = mySubscriptionRegistry.get(theIdElement.getIdPart());
SubscriptionDeliveringEmailSubscriber subscriber = (SubscriptionDeliveringEmailSubscriber) activeSubscription.getDeliveryHandlerForUnitTest(); SubscriptionDeliveringEmailSubscriber subscriber = (SubscriptionDeliveringEmailSubscriber) mySubscriptionChannelRegistry.get(activeSubscription.getChannelName()).getDeliveryHandlerForUnitTest();
subscriber.setEmailSender(myEmailSender); subscriber.setEmailSender(myEmailSender);
} }

View File

@ -20,7 +20,7 @@ package ca.uhn.fhir.jpa.subscription.module;
* #L% * #L%
*/ */
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel;
import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.StopWatch;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.concurrent.BasicThreadFactory;

View File

@ -22,47 +22,28 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import com.google.common.annotations.VisibleForTesting;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.MessageHandler;
import java.io.Closeable; public class ActiveSubscription {
import java.util.Collection;
import java.util.HashSet;
public class ActiveSubscription implements Closeable {
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class); private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
private CanonicalSubscription mySubscription; private CanonicalSubscription mySubscription;
private final ISubscribableChannel mySubscribableChannel; private final String myChannelName;
private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>();
private boolean flagForDeletion; private boolean flagForDeletion;
public ActiveSubscription(CanonicalSubscription theSubscription, ISubscribableChannel theSubscribableChannel) { public ActiveSubscription(CanonicalSubscription theSubscription, String theChannelName) {
mySubscription = theSubscription; mySubscription = theSubscription;
mySubscribableChannel = theSubscribableChannel; myChannelName = theChannelName;
} }
public CanonicalSubscription getSubscription() { public CanonicalSubscription getSubscription() {
return mySubscription; return mySubscription;
} }
public ISubscribableChannel getSubscribableChannel() { public String getChannelName() {
return mySubscribableChannel; return myChannelName;
}
public void register(MessageHandler theHandler) {
mySubscribableChannel.subscribe(theHandler);
myDeliveryHandlerSet.add(theHandler);
}
public void unregister(MessageHandler theMessageHandler) {
if (mySubscribableChannel != null) {
mySubscribableChannel.unsubscribe(theMessageHandler);
}
} }
public IIdType getIdElement(FhirContext theFhirContext) { public IIdType getIdElement(FhirContext theFhirContext) {
@ -73,11 +54,6 @@ public class ActiveSubscription implements Closeable {
return mySubscription.getCriteriaString(); return mySubscription.getCriteriaString();
} }
@VisibleForTesting
public MessageHandler getDeliveryHandlerForUnitTest() {
return myDeliveryHandlerSet.iterator().next();
}
public void setSubscription(CanonicalSubscription theCanonicalizedSubscription) { public void setSubscription(CanonicalSubscription theCanonicalizedSubscription) {
mySubscription = theCanonicalizedSubscription; mySubscription = theCanonicalizedSubscription;
} }
@ -89,37 +65,4 @@ public class ActiveSubscription implements Closeable {
public void setFlagForDeletion(boolean theFlagForDeletion) { public void setFlagForDeletion(boolean theFlagForDeletion) {
flagForDeletion = theFlagForDeletion; flagForDeletion = theFlagForDeletion;
} }
@Override
public void close() {
for (MessageHandler messageHandler : myDeliveryHandlerSet) {
unregister(messageHandler);
}
if (mySubscribableChannel instanceof DisposableBean) {
int subscriberCount = mySubscribableChannel.getSubscriberCount();
if (subscriberCount > 0) {
ourLog.info("Channel for subscription {} still has {} subscribers. Not destroying.", mySubscription.getIdPart(), subscriberCount);
} else {
ourLog.info("Channel for subscription {} has no subscribers. Destroying channel.", mySubscription.getIdPart());
tryDestroyChannel((DisposableBean) mySubscribableChannel);
}
}
}
private void tryDestroyChannel(DisposableBean theSubscribableChannel) {
try {
theSubscribableChannel.destroy();
} catch (Exception e) {
ourLog.error("Failed to destroy channel bean", e);
}
}
/**
* Use close() instead
* KHS 15 Apr 2019
*/
@Deprecated
public void unregisterAll() {
close();
}
} }

View File

@ -20,6 +20,8 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
* #L% * #L%
*/ */
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -47,20 +49,20 @@ class ActiveSubscriptionCache {
return myCache.size(); return myCache.size();
} }
public void put(String theSubscriptionId, ActiveSubscription theValue) { public void put(String theSubscriptionId, ActiveSubscription theActiveSubscription) {
myCache.put(theSubscriptionId, theValue); myCache.put(theSubscriptionId, theActiveSubscription);
} }
public synchronized void remove(String theSubscriptionId) { public synchronized ActiveSubscription remove(String theSubscriptionId) {
Validate.notBlank(theSubscriptionId); Validate.notBlank(theSubscriptionId);
ActiveSubscription activeSubscription = myCache.get(theSubscriptionId); ActiveSubscription activeSubscription = myCache.get(theSubscriptionId);
if (activeSubscription == null) { if (activeSubscription == null) {
return; return null;
} }
activeSubscription.close();
myCache.remove(theSubscriptionId); myCache.remove(theSubscriptionId);
return activeSubscription;
} }
public void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) { public void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {

View File

@ -21,6 +21,8 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
*/ */
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;

View File

@ -23,14 +23,14 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Subscription; import org.hl7.fhir.r4.model.Subscription;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
@ -52,11 +52,9 @@ public class SubscriptionRegistry {
@Autowired @Autowired
SubscriptionCanonicalizer<IBaseResource> mySubscriptionCanonicalizer; SubscriptionCanonicalizer<IBaseResource> mySubscriptionCanonicalizer;
@Autowired @Autowired
SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory; ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
@Autowired @Autowired
SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory; SubscriptionChannelRegistry mySubscriptionChannelRegistry;
@Autowired
ModelConfig myModelConfig;
@Autowired @Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster; private IInterceptorBroadcaster myInterceptorBroadcaster;
@ -90,21 +88,12 @@ public class SubscriptionRegistry {
Validate.notNull(theSubscription); Validate.notNull(theSubscription);
CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription); CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
ISubscribableChannel deliveryChannel;
Optional<MessageHandler> deliveryHandler;
if (myModelConfig.isSubscriptionMatchingEnabled()) { String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(canonicalized);
deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(canonicalized);
deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(canonicalized);
} else {
deliveryChannel = null;
deliveryHandler = Optional.empty();
}
ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, deliveryChannel); ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, channelName);
deliveryHandler.ifPresent(activeSubscription::register); mySubscriptionChannelRegistry.add(activeSubscription);
myActiveSubscriptionCache.put(subscriptionId, activeSubscription); myActiveSubscriptionCache.put(subscriptionId, activeSubscription);
// Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
@ -120,7 +109,10 @@ public class SubscriptionRegistry {
String subscriptionId = theId.getIdPart(); String subscriptionId = theId.getIdPart();
ourLog.info("Unregistering active subscription {}", theId.toUnqualified().getValue()); ourLog.info("Unregistering active subscription {}", theId.toUnqualified().getValue());
myActiveSubscriptionCache.remove(subscriptionId); ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(subscriptionId);
if (activeSubscription != null) {
mySubscriptionChannelRegistry.remove(activeSubscription);
}
} }
@PreDestroy @PreDestroy

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.module.cache; package ca.uhn.fhir.jpa.subscription.module.channel;
import org.springframework.integration.support.management.SubscribableChannelManagement; import org.springframework.integration.support.management.SubscribableChannelManagement;
import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.SubscribableChannel;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.module.cache; package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;

View File

@ -0,0 +1,47 @@
package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import org.apache.commons.lang3.Validate;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
class SubscriptionChannelCache {
private final Map<String, SubscriptionChannelWithHandlers> myCache = new ConcurrentHashMap<>();
public SubscriptionChannelWithHandlers get(String theChannelName) {
return myCache.get(theChannelName);
}
public Collection<SubscriptionChannelWithHandlers> getAll() {
return Collections.unmodifiableCollection(myCache.values());
}
public int size() {
return myCache.size();
}
public void put(String theChannelName, SubscriptionChannelWithHandlers theValue) {
myCache.put(theChannelName, theValue);
}
public synchronized void remove(String theChannelName) {
Validate.notBlank(theChannelName);
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = myCache.get(theChannelName);
if (subscriptionChannelWithHandlers == null) {
return;
}
subscriptionChannelWithHandlers.close();
myCache.remove(theChannelName);
}
public boolean containsKey(String theChannelName) {
return myCache.containsKey(theChannelName);
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.module.cache; package ca.uhn.fhir.jpa.subscription.module.channel;
/*- /*-
* #%L * #%L
@ -21,6 +21,9 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
*/ */
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -29,17 +32,13 @@ public class SubscriptionChannelFactory {
private ISubscribableChannelFactory mySubscribableChannelFactory; private ISubscribableChannelFactory mySubscribableChannelFactory;
@Autowired
ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
@Autowired @Autowired
public SubscriptionChannelFactory(ISubscribableChannelFactory theSubscribableChannelFactory) { public SubscriptionChannelFactory(ISubscribableChannelFactory theSubscribableChannelFactory) {
mySubscribableChannelFactory = theSubscribableChannelFactory; mySubscribableChannelFactory = theSubscribableChannelFactory;
} }
public ISubscribableChannel newDeliveryChannel(CanonicalSubscription theCanonicalSubscription) { public ISubscribableChannel newDeliveryChannel(String theChannelName) {
String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(theCanonicalSubscription); return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers());
return mySubscribableChannelFactory.createSubscribableChannel(channelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers());
} }
public ISubscribableChannel newMatchingChannel(String theChannelName) { public ISubscribableChannel newMatchingChannel(String theChannelName) {

View File

@ -0,0 +1,62 @@
package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class SubscriptionChannelRegistry {
private final SubscriptionChannelCache mySubscriptionChannelCache = new SubscriptionChannelCache();
// This map is a reference count so we know to destroy the channel if there are no more active subscriptions using it
private final Multimap<String, ActiveSubscription> myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build();
@Autowired
SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
@Autowired
SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
@Autowired
ModelConfig myModelConfig;
public void add(ActiveSubscription theActiveSubscription) {
if (!myModelConfig.isSubscriptionMatchingEnabled()) {
return;
}
String channelName = theActiveSubscription.getChannelName();
myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription);
if (mySubscriptionChannelCache.containsKey(channelName)) {
return;
}
ISubscribableChannel deliveryChannel;
Optional<MessageHandler> deliveryHandler;
deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName);
deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getSubscription().getChannelType());
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel);
deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler);
mySubscriptionChannelCache.put(channelName, subscriptionChannelWithHandlers);
}
public void remove(ActiveSubscription theActiveSubscription) {
String channelName = theActiveSubscription.getChannelName();
myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription);
// FIXME KHS test
// This was the last one. Shut down the channel
if (!myActiveSubscriptionByChannelName.containsKey(channelName)) {
SubscriptionChannelWithHandlers channel = mySubscriptionChannelCache.get(channelName);
channel.close();
}
}
public SubscriptionChannelWithHandlers get(String theChannelName) {
return mySubscriptionChannelCache.get(theChannelName);
}
}

View File

@ -0,0 +1,70 @@
package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.io.Closeable;
import java.util.Collection;
import java.util.HashSet;
public class SubscriptionChannelWithHandlers implements Closeable {
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
private final String myChannelName;
private final ISubscribableChannel mySubscribableChannel;
private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>();
public SubscriptionChannelWithHandlers(String theChannelName, ISubscribableChannel theSubscribableChannel) {
myChannelName = theChannelName;
mySubscribableChannel = theSubscribableChannel;
}
public void addHandler(MessageHandler theHandler) {
mySubscribableChannel.subscribe(theHandler);
myDeliveryHandlerSet.add(theHandler);
}
public void removeHandler(MessageHandler theMessageHandler) {
if (mySubscribableChannel != null) {
mySubscribableChannel.unsubscribe(theMessageHandler);
}
}
@VisibleForTesting
public MessageHandler getDeliveryHandlerForUnitTest() {
return myDeliveryHandlerSet.iterator().next();
}
@Override
public void close() {
for (MessageHandler messageHandler : myDeliveryHandlerSet) {
removeHandler(messageHandler);
}
if (mySubscribableChannel instanceof DisposableBean) {
int subscriberCount = mySubscribableChannel.getSubscriberCount();
if (subscriberCount > 0) {
ourLog.info("Channel {} still has {} subscribers. Not destroying.", myChannelName, subscriberCount);
} else {
ourLog.info("Channel for subscription {} has no subscribers. Destroying channel.", myChannelName);
tryDestroyChannel((DisposableBean) mySubscribableChannel);
}
}
}
private void tryDestroyChannel(DisposableBean theSubscribableChannel) {
try {
theSubscribableChannel.destroy();
} catch (Exception e) {
ourLog.error("Failed to destroy channel bean", e);
}
}
public MessageChannel getChannel() {
return mySubscribableChannel;
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.module.cache; package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.module.cache; package ca.uhn.fhir.jpa.subscription.module.channel;
/*- /*-
* #%L * #%L
@ -42,10 +42,10 @@ public abstract class SubscriptionDeliveryHandlerFactory {
@Lookup @Lookup
protected abstract SubscriptionDeliveringRestHookSubscriber getSubscriptionDeliveringRestHookSubscriber(); protected abstract SubscriptionDeliveringRestHookSubscriber getSubscriptionDeliveringRestHookSubscriber();
public Optional<MessageHandler> createDeliveryHandler(CanonicalSubscription theSubscription) { public Optional<MessageHandler> createDeliveryHandler(CanonicalSubscriptionChannelType theChannelType) {
if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) { if (theChannelType == CanonicalSubscriptionChannelType.EMAIL) {
return Optional.of(getSubscriptionDeliveringEmailSubscriber(myEmailSender)); return Optional.of(getSubscriptionDeliveringEmailSubscriber(myEmailSender));
} else if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) { } else if (theChannelType == CanonicalSubscriptionChannelType.RESTHOOK) {
return Optional.of(getSubscriptionDeliveringRestHookSubscriber()); return Optional.of(getSubscriptionDeliveringRestHookSubscriber());
} else { } else {
return Optional.empty(); return Optional.empty();

View File

@ -21,7 +21,7 @@ package ca.uhn.fhir.jpa.subscription.module.config;
*/ */
import ca.uhn.fhir.interceptor.executor.InterceptorService; import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.api.EncodingEnum;
@ -61,6 +62,8 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
private SubscriptionRegistry mySubscriptionRegistry; private SubscriptionRegistry mySubscriptionRegistry;
@Autowired @Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster; private IInterceptorBroadcaster myInterceptorBroadcaster;
@Autowired
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
@Override @Override
public void handleMessage(Message<?> theMessage) throws MessagingException { public void handleMessage(Message<?> theMessage) throws MessagingException {
@ -177,7 +180,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) { private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) {
boolean retval = false; boolean retval = false;
ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg); ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg);
MessageChannel deliveryChannel = nextActiveSubscription.getSubscribableChannel(); MessageChannel deliveryChannel = mySubscriptionChannelRegistry.get(nextActiveSubscription.getChannelName()).getChannel();
if (deliveryChannel != null) { if (deliveryChannel != null) {
retval = true; retval = true;
trySendToDeliveryChannel(wrappedMsg, deliveryChannel); trySendToDeliveryChannel(wrappedMsg, deliveryChannel);

View File

@ -22,6 +22,8 @@ package ca.uhn.fhir.jpa.subscription.module.subscriber.websocket;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelWithHandlers;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.IdType;
@ -45,6 +47,8 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
private static Logger ourLog = LoggerFactory.getLogger(SubscriptionWebsocketHandler.class); private static Logger ourLog = LoggerFactory.getLogger(SubscriptionWebsocketHandler.class);
@Autowired @Autowired
protected WebsocketConnectionValidator myWebsocketConnectionValidator; protected WebsocketConnectionValidator myWebsocketConnectionValidator;
@Autowired
SubscriptionChannelRegistry mySubscriptionChannelRegistry;
@Autowired @Autowired
private FhirContext myCtx; private FhirContext myCtx;
@ -102,21 +106,23 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
} }
private class BoundStaticSubscipriptionState implements IState, MessageHandler { private class BoundStaticSubscriptionState implements IState, MessageHandler {
private final WebSocketSession mySession; private final WebSocketSession mySession;
private final ActiveSubscription myActiveSubscription; private final ActiveSubscription myActiveSubscription;
public BoundStaticSubscipriptionState(WebSocketSession theSession, ActiveSubscription theActiveSubscription) { public BoundStaticSubscriptionState(WebSocketSession theSession, ActiveSubscription theActiveSubscription) {
mySession = theSession; mySession = theSession;
myActiveSubscription = theActiveSubscription; myActiveSubscription = theActiveSubscription;
theActiveSubscription.register(this); SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(theActiveSubscription.getChannelName());
subscriptionChannelWithHandlers.addHandler(this);
} }
@Override @Override
public void closing() { public void closing() {
myActiveSubscription.unregister(this); SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(myActiveSubscription.getChannelName());
subscriptionChannelWithHandlers.removeHandler(this);
} }
private void deliver() { private void deliver() {
@ -172,7 +178,7 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
return null; return null;
} }
myState = new BoundStaticSubscipriptionState(theSession, response.getActiveSubscription()); myState = new BoundStaticSubscriptionState(theSession, response.getActiveSubscription());
return id; return id;
} }

View File

@ -10,8 +10,9 @@ import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider; import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider;
@ -60,10 +61,12 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
IInterceptorService myInterceptorRegistry; IInterceptorService myInterceptorRegistry;
@Autowired @Autowired
protected SubscriptionRegistry mySubscriptionRegistry; protected SubscriptionRegistry mySubscriptionRegistry;
@Autowired @Autowired
private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider; private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider;
@Autowired @Autowired
private SubscriptionLoader mySubscriptionLoader; private SubscriptionLoader mySubscriptionLoader;
@Autowired
private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
protected String myCode = "1000000050"; protected String myCode = "1000000050";
@ -87,8 +90,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
canonicalSubscription.setIdElement(new IdDt("test")); canonicalSubscription.setIdElement(new IdDt("test"));
canonicalSubscription.setChannelType(CanonicalSubscriptionChannelType.RESTHOOK); canonicalSubscription.setChannelType(CanonicalSubscriptionChannelType.RESTHOOK);
mySubscriptionRegistry.unregisterAllSubscriptions(); mySubscriptionRegistry.unregisterAllSubscriptions();
ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel(canonicalSubscription); ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel(mySubscriptionDeliveryChannelNamer.nameFromSubscription(canonicalSubscription));
ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler); ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost);
} }
@ -110,11 +113,11 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
return theResource; return theResource;
} }
protected void initSubscriptionLoader(List<Subscription> subscriptions, String uuid) throws InterruptedException { protected void initSubscriptionLoader(List<Subscription> subscriptions, String uuid) throws InterruptedException {
myMockFhirClientSubscriptionProvider.setBundleProvider(new SimpleBundleProvider(new ArrayList<>(subscriptions), uuid)); myMockFhirClientSubscriptionProvider.setBundleProvider(new SimpleBundleProvider(new ArrayList<>(subscriptions), uuid));
mySubscriptionLoader.doSyncSubscriptionsForUnitTest(); mySubscriptionLoader.doSyncSubscriptionsForUnitTest();
} }
protected Subscription sendSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException { protected Subscription sendSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {
Subscription subscription = makeActiveSubscription(theCriteria, thePayload, theEndpoint); Subscription subscription = makeActiveSubscription(theCriteria, thePayload, theEndpoint);
mySubscriptionActivatedPost.setExpectedCount(1); mySubscriptionActivatedPost.setExpectedCount(1);
@ -157,11 +160,11 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
ourListenerServer.setHandler(proxyHandler); ourListenerServer.setHandler(proxyHandler);
JettyUtil.startServer(ourListenerServer); JettyUtil.startServer(ourListenerServer);
ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer); ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer);
ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context";
FhirContext context = ourListenerRestServer.getFhirContext(); FhirContext context = ourListenerRestServer.getFhirContext();
//Preload structure definitions so the load doesn't happen during the test (first load can be a little slow) //Preload structure definitions so the load doesn't happen during the test (first load can be a little slow)
context.getValidationSupport().fetchAllStructureDefinitions(context); context.getValidationSupport().fetchAllStructureDefinitions(context);
} }
@AfterClass @AfterClass
@ -206,6 +209,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
} }
@Override @Override
public void clear() { updateLatch.clear();} public void clear() {
updateLatch.clear();
}
} }
} }