added queue naming service

This commit is contained in:
Ken Stevens 2019-09-25 16:45:50 -04:00
parent 4570e50c03
commit 2c21d72c03
5 changed files with 38 additions and 7 deletions

View File

@ -0,0 +1,7 @@
package ca.uhn.fhir.jpa.subscription.module.cache;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
public interface ISubscriptionDeliveryChannelNamer {
String nameFromSubscription(CanonicalSubscription theCanonicalSubscription);
}

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
@ -29,16 +30,16 @@ public class SubscriptionChannelFactory {
private ISubscribableChannelFactory mySubscribableChannelFactory;
@Autowired
ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
@Autowired
public SubscriptionChannelFactory(ISubscribableChannelFactory theSubscribableChannelFactory) {
mySubscribableChannelFactory = theSubscribableChannelFactory;
}
public SubscribableChannel newDeliveryChannel(String theSubscriptionId, String theChannelType) {
String channelName = "subscription-delivery-" +
theChannelType +
"-" +
theSubscriptionId;
public SubscribableChannel newDeliveryChannel(CanonicalSubscription theCanonicalSubscription) {
String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(theCanonicalSubscription);
return mySubscribableChannelFactory.createSubscribableChannel(channelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers());
}

View File

@ -0,0 +1,17 @@
package ca.uhn.fhir.jpa.subscription.module.cache;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import org.springframework.stereotype.Service;
@Service
public class SubscriptionDeliveryChannelNamer implements ISubscriptionDeliveryChannelNamer {
@Override
public String nameFromSubscription(CanonicalSubscription theCanonicalSubscription) {
String channelType = theCanonicalSubscription.getChannelType().toCode().toLowerCase();
String subscriptionId = theCanonicalSubscription.getIdPart();
return "subscription-delivery-" +
channelType +
"-" +
subscriptionId;
}
}

View File

@ -95,7 +95,7 @@ public class SubscriptionRegistry {
Optional<MessageHandler> deliveryHandler;
if (myModelConfig.isSubscriptionMatchingEnabled()) {
deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(subscriptionId, canonicalized.getChannelType().toCode().toLowerCase());
deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(canonicalized);
deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(canonicalized);
} else {
deliveryChannel = null;

View File

@ -7,6 +7,8 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.model.concurrency.IPointcutLatch;
import ca.uhn.fhir.jpa.model.concurrency.PointcutLatch;
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader;
@ -14,6 +16,7 @@ import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update;
@ -80,8 +83,11 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
ourCreatedObservations.clear();
ourUpdatedObservations.clear();
ourContentTypes.clear();
CanonicalSubscription canonicalSubscription = new CanonicalSubscription();
canonicalSubscription.setIdElement(new IdDt("test"));
canonicalSubscription.setChannelType(CanonicalSubscriptionChannelType.RESTHOOK);
mySubscriptionRegistry.unregisterAllSubscriptions();
ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase());
ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel(canonicalSubscription);
ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost);