Work on subscription cleanup

This commit is contained in:
jamesagnew 2020-04-05 07:21:05 -04:00
parent 79da1578cb
commit 0a28c0c060
6 changed files with 39 additions and 40 deletions

View File

@ -81,7 +81,7 @@ public class SubscriptionTestUtil {
public void setEmailSender(IIdType theIdElement) { public void setEmailSender(IIdType theIdElement) {
ActiveSubscription activeSubscription = mySubscriptionRegistry.get(theIdElement.getIdPart()); ActiveSubscription activeSubscription = mySubscriptionRegistry.get(theIdElement.getIdPart());
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(activeSubscription.getChannelName()); SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.getDeliveryReceiverChannel(activeSubscription.getChannelName());
SubscriptionDeliveringEmailSubscriber subscriber = (SubscriptionDeliveringEmailSubscriber) subscriptionChannelWithHandlers.getDeliveryHandlerForUnitTest(); SubscriptionDeliveringEmailSubscriber subscriber = (SubscriptionDeliveringEmailSubscriber) subscriptionChannelWithHandlers.getDeliveryHandlerForUnitTest();
subscriber.setEmailSender(myEmailSender); subscriber.setEmailSender(myEmailSender);
} }

View File

@ -38,6 +38,10 @@ public class SubscriptionChannelFactory {
@Autowired @Autowired
private IQueueChannelFactory mySubscribableChannelFactory; private IQueueChannelFactory mySubscribableChannelFactory;
public MessageChannel newDeliverySendingChannel(String theChannelName) {
return mySubscribableChannelFactory.getOrCreateSender(theChannelName, ResourceDeliveryMessage.class, getDeliveryChannelConcurrentConsumers());
}
public SubscribableChannel newDeliveryChannel(String theChannelName) { public SubscribableChannel newDeliveryChannel(String theChannelName) {
SubscribableChannel channel = mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryMessage.class, getDeliveryChannelConcurrentConsumers()); SubscribableChannel channel = mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryMessage.class, getDeliveryChannelConcurrentConsumers());
return new BroadcastingSubscribableChannelWrapper(channel); return new BroadcastingSubscribableChannelWrapper(channel);

View File

@ -20,7 +20,6 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
* #L% * #L%
*/ */
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -29,46 +28,48 @@ import com.google.common.collect.MultimapBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.SubscribableChannel;
import java.util.Collection; import java.util.Collection;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
public class SubscriptionChannelRegistry { public class SubscriptionChannelRegistry {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class); private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class);
private final SubscriptionChannelCache mySubscriptionChannelCache = new SubscriptionChannelCache(); private final SubscriptionChannelCache myDeliveryReceiverChannels = new SubscriptionChannelCache();
// This map is a reference count so we know to destroy the channel when there are no more active subscriptions using it // This map is a reference count so we know to destroy the channel when there are no more active subscriptions using it
// Key Channel Name, Value Subscription Id // Key Channel Name, Value Subscription Id
private final Multimap<String, String> myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build(); private final Multimap<String, String> myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build();
private final Map<String, MessageChannel> myChannelNameToSender = new ConcurrentHashMap<>();
@Autowired @Autowired
private SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory; private SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
@Autowired @Autowired
private SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory; private SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
@Autowired
private ModelConfig myModelConfig;
public synchronized void add(ActiveSubscription theActiveSubscription) { public synchronized void add(ActiveSubscription theActiveSubscription) {
String channelName = theActiveSubscription.getChannelName(); String channelName = theActiveSubscription.getChannelName();
ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName); ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName);
myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId()); myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId());
if (mySubscriptionChannelCache.containsKey(channelName)) { if (myDeliveryReceiverChannels.containsKey(channelName)) {
ourLog.info("Channel {} already exists. Not creating.", channelName); ourLog.info("Channel {} already exists. Not creating.", channelName);
return; return;
} }
SubscribableChannel deliveryChannel; SubscribableChannel deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName);
Optional<MessageHandler> deliveryHandler; Optional<MessageHandler> deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType());
deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName);
deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType());
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel); SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel);
deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler); deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler);
mySubscriptionChannelCache.put(channelName, subscriptionChannelWithHandlers); myDeliveryReceiverChannels.put(channelName, subscriptionChannelWithHandlers);
MessageChannel sendingChannel = mySubscriptionDeliveryChannelFactory.newDeliverySendingChannel(channelName);
myChannelNameToSender.put(channelName, sendingChannel);
} }
public synchronized void remove(ActiveSubscription theActiveSubscription) { public synchronized void remove(ActiveSubscription theActiveSubscription) {
@ -81,31 +82,25 @@ public class SubscriptionChannelRegistry {
// This was the last one. Close and remove the channel // This was the last one. Close and remove the channel
if (!myActiveSubscriptionByChannelName.containsKey(channelName)) { if (!myActiveSubscriptionByChannelName.containsKey(channelName)) {
SubscriptionChannelWithHandlers channel = mySubscriptionChannelCache.get(channelName); SubscriptionChannelWithHandlers channel = myDeliveryReceiverChannels.get(channelName);
if (channel != null) { if (channel != null) {
channel.close(); channel.close();
} }
mySubscriptionChannelCache.closeAndRemove(channelName); myDeliveryReceiverChannels.closeAndRemove(channelName);
} }
myChannelNameToSender.remove(channelName);
} }
public synchronized SubscriptionChannelWithHandlers get(String theChannelName) { public synchronized SubscriptionChannelWithHandlers getDeliveryReceiverChannel(String theChannelName) {
return mySubscriptionChannelCache.get(theChannelName); return myDeliveryReceiverChannels.get(theChannelName);
}
public synchronized MessageChannel getDeliverySenderChannel(String theChannelName) {
return myChannelNameToSender.get(theChannelName);
} }
public synchronized int size() { public synchronized int size() {
return mySubscriptionChannelCache.size(); return myDeliveryReceiverChannels.size();
}
@VisibleForTesting
public void logForUnitTest() {
ourLog.info("{} Channels: {}", this, size());
mySubscriptionChannelCache.logForUnitTest();
for (String key : myActiveSubscriptionByChannelName.keySet()) {
Collection<String> list = myActiveSubscriptionByChannelName.get(key);
for (String value : list) {
ourLog.info("ActiveSubscriptionByChannelName {}: {}", key, value);
}
}
} }
} }

View File

@ -115,13 +115,13 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
mySession = theSession; mySession = theSession;
myActiveSubscription = theActiveSubscription; myActiveSubscription = theActiveSubscription;
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(theActiveSubscription.getChannelName()); SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.getDeliveryReceiverChannel(theActiveSubscription.getChannelName());
subscriptionChannelWithHandlers.addHandler(this); subscriptionChannelWithHandlers.addHandler(this);
} }
@Override @Override
public void closing() { public void closing() {
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(myActiveSubscription.getChannelName()); SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.getDeliveryReceiverChannel(myActiveSubscription.getChannelName());
subscriptionChannelWithHandlers.removeHandler(this); subscriptionChannelWithHandlers.removeHandler(this);
} }

View File

@ -7,13 +7,13 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.api.EncodingEnum;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
@ -189,7 +189,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) { private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) {
boolean retVal = false; boolean retVal = false;
ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg); ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg);
MessageChannel deliveryChannel = mySubscriptionChannelRegistry.get(nextActiveSubscription.getChannelName()).getChannel(); MessageChannel deliveryChannel = mySubscriptionChannelRegistry.getDeliverySenderChannel(nextActiveSubscription.getChannelName());
if (deliveryChannel != null) { if (deliveryChannel != null) {
retVal = true; retVal = true;
trySendToDeliveryChannel(wrappedMsg, deliveryChannel); trySendToDeliveryChannel(wrappedMsg, deliveryChannel);

View File

@ -49,13 +49,13 @@ public class SubscriptionChannelRegistryTest {
cansubB.setIdElement(new IdDt("B")); cansubB.setIdElement(new IdDt("B"));
ActiveSubscription activeSubscriptionB = new ActiveSubscription(cansubB, TEST_CHANNEL_NAME); ActiveSubscription activeSubscriptionB = new ActiveSubscription(cansubB, TEST_CHANNEL_NAME);
assertNull(mySubscriptionChannelRegistry.get(TEST_CHANNEL_NAME)); assertNull(mySubscriptionChannelRegistry.getDeliveryReceiverChannel(TEST_CHANNEL_NAME));
mySubscriptionChannelRegistry.add(activeSubscriptionA); mySubscriptionChannelRegistry.add(activeSubscriptionA);
assertNotNull(mySubscriptionChannelRegistry.get(TEST_CHANNEL_NAME)); assertNotNull(mySubscriptionChannelRegistry.getDeliveryReceiverChannel(TEST_CHANNEL_NAME));
mySubscriptionChannelRegistry.add(activeSubscriptionB); mySubscriptionChannelRegistry.add(activeSubscriptionB);
mySubscriptionChannelRegistry.remove(activeSubscriptionB); mySubscriptionChannelRegistry.remove(activeSubscriptionB);
assertNotNull(mySubscriptionChannelRegistry.get(TEST_CHANNEL_NAME)); assertNotNull(mySubscriptionChannelRegistry.getDeliveryReceiverChannel(TEST_CHANNEL_NAME));
mySubscriptionChannelRegistry.remove(activeSubscriptionA); mySubscriptionChannelRegistry.remove(activeSubscriptionA);
assertNull(mySubscriptionChannelRegistry.get(TEST_CHANNEL_NAME)); assertNull(mySubscriptionChannelRegistry.getDeliveryReceiverChannel(TEST_CHANNEL_NAME));
} }
} }