change channel cache to us id instead of object
This commit is contained in:
parent
6867a57625
commit
4c8e330669
|
@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
|
||||||
|
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
|
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
|
||||||
|
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
|
||||||
import org.hl7.fhir.instance.model.api.IIdType;
|
import org.hl7.fhir.instance.model.api.IIdType;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -31,11 +32,13 @@ public class ActiveSubscription {
|
||||||
|
|
||||||
private CanonicalSubscription mySubscription;
|
private CanonicalSubscription mySubscription;
|
||||||
private final String myChannelName;
|
private final String myChannelName;
|
||||||
|
private final String myId;
|
||||||
private boolean flagForDeletion;
|
private boolean flagForDeletion;
|
||||||
|
|
||||||
public ActiveSubscription(CanonicalSubscription theSubscription, String theChannelName) {
|
public ActiveSubscription(CanonicalSubscription theSubscription, String theChannelName) {
|
||||||
mySubscription = theSubscription;
|
mySubscription = theSubscription;
|
||||||
myChannelName = theChannelName;
|
myChannelName = theChannelName;
|
||||||
|
myId = theSubscription.getIdPart();
|
||||||
}
|
}
|
||||||
|
|
||||||
public CanonicalSubscription getSubscription() {
|
public CanonicalSubscription getSubscription() {
|
||||||
|
@ -46,10 +49,6 @@ public class ActiveSubscription {
|
||||||
return myChannelName;
|
return myChannelName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IIdType getIdElement(FhirContext theFhirContext) {
|
|
||||||
return mySubscription.getIdElement(theFhirContext);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getCriteriaString() {
|
public String getCriteriaString() {
|
||||||
return mySubscription.getCriteriaString();
|
return mySubscription.getCriteriaString();
|
||||||
}
|
}
|
||||||
|
@ -65,4 +64,12 @@ public class ActiveSubscription {
|
||||||
public void setFlagForDeletion(boolean theFlagForDeletion) {
|
public void setFlagForDeletion(boolean theFlagForDeletion) {
|
||||||
flagForDeletion = theFlagForDeletion;
|
flagForDeletion = theFlagForDeletion;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return myId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CanonicalSubscriptionChannelType getChannelType() {
|
||||||
|
return mySubscription.getChannelType();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,8 @@ public class SubscriptionChannelRegistry {
|
||||||
|
|
||||||
private final SubscriptionChannelCache mySubscriptionChannelCache = new SubscriptionChannelCache();
|
private final SubscriptionChannelCache mySubscriptionChannelCache = new SubscriptionChannelCache();
|
||||||
// This map is a reference count so we know to destroy the channel when there are no more active subscriptions using it
|
// This map is a reference count so we know to destroy the channel when there are no more active subscriptions using it
|
||||||
private final Multimap<String, ActiveSubscription> myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build();
|
// Key Channel Name, Value Subscription Id
|
||||||
|
private final Multimap<String, String> myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build();
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
|
SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
|
||||||
|
@ -28,15 +29,13 @@ public class SubscriptionChannelRegistry {
|
||||||
SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
|
SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
|
||||||
@Autowired
|
@Autowired
|
||||||
ModelConfig myModelConfig;
|
ModelConfig myModelConfig;
|
||||||
@Autowired
|
|
||||||
FhirContext myFhirContext;
|
|
||||||
|
|
||||||
public void add(ActiveSubscription theActiveSubscription) {
|
public void add(ActiveSubscription theActiveSubscription) {
|
||||||
if (!myModelConfig.isSubscriptionMatchingEnabled()) {
|
if (!myModelConfig.isSubscriptionMatchingEnabled()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String channelName = theActiveSubscription.getChannelName();
|
String channelName = theActiveSubscription.getChannelName();
|
||||||
myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription);
|
myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId());
|
||||||
|
|
||||||
if (mySubscriptionChannelCache.containsKey(channelName)) {
|
if (mySubscriptionChannelCache.containsKey(channelName)) {
|
||||||
return;
|
return;
|
||||||
|
@ -46,7 +45,7 @@ public class SubscriptionChannelRegistry {
|
||||||
Optional<MessageHandler> deliveryHandler;
|
Optional<MessageHandler> deliveryHandler;
|
||||||
|
|
||||||
deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName);
|
deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName);
|
||||||
deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getSubscription().getChannelType());
|
deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType());
|
||||||
|
|
||||||
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel);
|
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel);
|
||||||
deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler);
|
deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler);
|
||||||
|
@ -55,9 +54,9 @@ public class SubscriptionChannelRegistry {
|
||||||
|
|
||||||
public void remove(ActiveSubscription theActiveSubscription) {
|
public void remove(ActiveSubscription theActiveSubscription) {
|
||||||
String channelName = theActiveSubscription.getChannelName();
|
String channelName = theActiveSubscription.getChannelName();
|
||||||
boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription);
|
boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
|
||||||
if (!removed) {
|
if (!removed) {
|
||||||
ourLog.warn("Removing unregistered subscription {}", theActiveSubscription.getIdElement(myFhirContext).getIdPart());
|
ourLog.warn("Request to remove subscription {} that was not added", theActiveSubscription.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
// This was the last one. Shut down the channel
|
// This was the last one. Shut down the channel
|
||||||
|
|
|
@ -136,7 +136,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ourLog.debug("Subscription {} was matched by resource {} {}",
|
ourLog.debug("Subscription {} was matched by resource {} {}",
|
||||||
nextActiveSubscription.getSubscription().getIdElement(myFhirContext).getValue(),
|
nextActiveSubscription.getId(),
|
||||||
resourceId.toUnqualifiedVersionless().getValue(),
|
resourceId.toUnqualifiedVersionless().getValue(),
|
||||||
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
|
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
|
||||||
|
|
||||||
|
@ -185,7 +185,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
|
||||||
retval = true;
|
retval = true;
|
||||||
trySendToDeliveryChannel(wrappedMsg, deliveryChannel);
|
trySendToDeliveryChannel(wrappedMsg, deliveryChannel);
|
||||||
} else {
|
} else {
|
||||||
ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getIdElement(myFhirContext));
|
ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getId());
|
||||||
}
|
}
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
|
@ -203,7 +203,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getId(ActiveSubscription theActiveSubscription) {
|
private String getId(ActiveSubscription theActiveSubscription) {
|
||||||
return theActiveSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue();
|
return theActiveSubscription.getId();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean validCriteria(ActiveSubscription theActiveSubscription, IIdType theResourceId) {
|
private boolean validCriteria(ActiveSubscription theActiveSubscription, IIdType theResourceId) {
|
||||||
|
|
|
@ -127,7 +127,7 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
|
||||||
|
|
||||||
private void deliver() {
|
private void deliver() {
|
||||||
try {
|
try {
|
||||||
String payload = "ping " + myActiveSubscription.getIdElement(myCtx).getIdPart();
|
String payload = "ping " + myActiveSubscription.getId();
|
||||||
ourLog.info("Sending WebSocket message: {}", payload);
|
ourLog.info("Sending WebSocket message: {}", payload);
|
||||||
mySession.sendMessage(new TextMessage(payload));
|
mySession.sendMessage(new TextMessage(payload));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
Loading…
Reference in New Issue