From 5feec85662dfd2c7253735e1398c33f4ed5c91d9 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Mon, 13 Aug 2018 05:27:17 -0400 Subject: [PATCH] Clean up subscription processing --- .../BaseSubscriptionInterceptor.java | 57 +++++++++++-------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java index 1a6273d842e..2506b700b75 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java @@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.SmartLifecycle; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -102,6 +103,7 @@ public abstract class BaseSubscriptionInterceptor exten @Qualifier(BaseConfig.TASK_EXECUTOR_NAME) private AsyncTaskExecutor myAsyncTaskExecutor; private Map, IFhirResourceDao> myResourceTypeToDao; + private Semaphore myInitSubscriptionsSemaphore = new Semaphore(1); /** * Constructor @@ -350,31 +352,40 @@ public abstract class BaseSubscriptionInterceptor exten @SuppressWarnings("unused") @Scheduled(fixedDelay = 10000) public void initSubscriptions() { - SearchParameterMap map = new SearchParameterMap(); - map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode())); - map.add(Subscription.SP_STATUS, new TokenOrListParam() - .addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode())) - .addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode()))); - map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS); - - RequestDetails req = new ServletSubRequestDetails(); - req.setSubRequest(true); - - IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req); - if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) { - ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions. Some subscriptions have not been loaded."); + if (!myInitSubscriptionsSemaphore.tryAcquire()) { + return; } + try { + ourLog.debug("Starting init subscriptions"); + SearchParameterMap map = new SearchParameterMap(); + map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode())); + map.add(Subscription.SP_STATUS, new TokenOrListParam() + .addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode())) + .addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode()))); + map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS); - List resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size()); + RequestDetails req = new ServletSubRequestDetails(); + req.setSubRequest(true); - Set allIds = new HashSet<>(); - for (IBaseResource resource : resourceList) { - String nextId = resource.getIdElement().getIdPart(); - allIds.add(nextId); - mySubscriptionActivatingSubscriber.activateAndRegisterSubscriptionIfRequired(resource); + IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req); + if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) { + ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions. Some subscriptions have not been loaded."); + } + + List resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size()); + + Set allIds = new HashSet<>(); + for (IBaseResource resource : resourceList) { + String nextId = resource.getIdElement().getIdPart(); + allIds.add(nextId); + mySubscriptionActivatingSubscriber.activateAndRegisterSubscriptionIfRequired(resource); + } + + unregisterAllSubscriptionsNotInCollection(allIds); + ourLog.trace("Finished init subscriptions - found {}", resourceList.size()); + } finally { + myInitSubscriptionsSemaphore.release(); } - - unregisterAllSubscriptionsNotInCollection(allIds); } @SuppressWarnings("unused") @@ -549,6 +560,7 @@ public abstract class BaseSubscriptionInterceptor exten public void unregisterHandler(String theSubscriptionId, MessageHandler theMessageHandler) { SubscribableChannel channel = mySubscribableChannel.get(theSubscriptionId); if (channel != null) { + channel.unsubscribe(theMessageHandler); if (channel instanceof DisposableBean) { try { ((DisposableBean) channel).destroy(); @@ -556,10 +568,9 @@ public abstract class BaseSubscriptionInterceptor exten ourLog.error("Failed to destroy channel bean", e); } } - channel.unsubscribe(theMessageHandler); } - mySubscribableChannel.remove(theSubscriptionId, theMessageHandler); + mySubscribableChannel.remove(theSubscriptionId); } @SuppressWarnings("UnusedReturnValue")