From 4c8e330669e651d44d5845288810c500a4cb2a39 Mon Sep 17 00:00:00 2001 From: Ken Stevens Date: Mon, 30 Sep 2019 17:50:12 -0400 Subject: [PATCH] change channel cache to us id instead of object --- .../module/cache/ActiveSubscription.java | 15 +++++++++++---- .../channel/SubscriptionChannelRegistry.java | 13 ++++++------- .../SubscriptionMatchingSubscriber.java | 6 +++--- .../websocket/SubscriptionWebsocketHandler.java | 2 +- 4 files changed, 21 insertions(+), 15 deletions(-) diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java index ab2e464cc0c..50aa89dfbc6 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java @@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.subscription.module.cache; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; +import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType; import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,11 +32,13 @@ public class ActiveSubscription { private CanonicalSubscription mySubscription; private final String myChannelName; + private final String myId; private boolean flagForDeletion; public ActiveSubscription(CanonicalSubscription theSubscription, String theChannelName) { mySubscription = theSubscription; myChannelName = theChannelName; + myId = theSubscription.getIdPart(); } public CanonicalSubscription getSubscription() { @@ -46,10 +49,6 @@ public class ActiveSubscription { return myChannelName; } - public IIdType getIdElement(FhirContext theFhirContext) { - return mySubscription.getIdElement(theFhirContext); - } - public String getCriteriaString() { return mySubscription.getCriteriaString(); } @@ -65,4 +64,12 @@ public class ActiveSubscription { public void setFlagForDeletion(boolean theFlagForDeletion) { flagForDeletion = theFlagForDeletion; } + + public String getId() { + return myId; + } + + public CanonicalSubscriptionChannelType getChannelType() { + return mySubscription.getChannelType(); + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java index 78f81b18ba1..0a299e0d029 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java @@ -20,7 +20,8 @@ public class SubscriptionChannelRegistry { private final SubscriptionChannelCache mySubscriptionChannelCache = new SubscriptionChannelCache(); // This map is a reference count so we know to destroy the channel when there are no more active subscriptions using it - private final Multimap myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build(); + // Key Channel Name, Value Subscription Id + private final Multimap myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build(); @Autowired SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory; @@ -28,15 +29,13 @@ public class SubscriptionChannelRegistry { SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory; @Autowired ModelConfig myModelConfig; - @Autowired - FhirContext myFhirContext; public void add(ActiveSubscription theActiveSubscription) { if (!myModelConfig.isSubscriptionMatchingEnabled()) { return; } String channelName = theActiveSubscription.getChannelName(); - myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription); + myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId()); if (mySubscriptionChannelCache.containsKey(channelName)) { return; @@ -46,7 +45,7 @@ public class SubscriptionChannelRegistry { Optional deliveryHandler; deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName); - deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getSubscription().getChannelType()); + deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType()); SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel); deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler); @@ -55,9 +54,9 @@ public class SubscriptionChannelRegistry { public void remove(ActiveSubscription theActiveSubscription) { String channelName = theActiveSubscription.getChannelName(); - boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription); + boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId()); if (!removed) { - ourLog.warn("Removing unregistered subscription {}", theActiveSubscription.getIdElement(myFhirContext).getIdPart()); + ourLog.warn("Request to remove subscription {} that was not added", theActiveSubscription.getId()); } // This was the last one. Shut down the channel diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java index f1928fcb659..00e57b28b2b 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java @@ -136,7 +136,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { continue; } ourLog.debug("Subscription {} was matched by resource {} {}", - nextActiveSubscription.getSubscription().getIdElement(myFhirContext).getValue(), + nextActiveSubscription.getId(), resourceId.toUnqualifiedVersionless().getValue(), matchResult.isInMemory() ? "in-memory" : "by querying the repository"); @@ -185,7 +185,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { retval = true; trySendToDeliveryChannel(wrappedMsg, deliveryChannel); } else { - ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getIdElement(myFhirContext)); + ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getId()); } return retval; } @@ -203,7 +203,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { } private String getId(ActiveSubscription theActiveSubscription) { - return theActiveSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue(); + return theActiveSubscription.getId(); } private boolean validCriteria(ActiveSubscription theActiveSubscription, IIdType theResourceId) { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/SubscriptionWebsocketHandler.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/SubscriptionWebsocketHandler.java index ab94522e5c3..00084112138 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/SubscriptionWebsocketHandler.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/SubscriptionWebsocketHandler.java @@ -127,7 +127,7 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement private void deliver() { try { - String payload = "ping " + myActiveSubscription.getIdElement(myCtx).getIdPart(); + String payload = "ping " + myActiveSubscription.getId(); ourLog.info("Sending WebSocket message: {}", payload); mySession.sendMessage(new TextMessage(payload)); } catch (IOException e) {