Clean up subscription processing

This commit is contained in:
James Agnew 2018-08-13 05:27:17 -04:00
parent 5ef31a505c
commit 5feec85662
1 changed files with 34 additions and 23 deletions

View File

@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@ -102,6 +103,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
@Qualifier(BaseConfig.TASK_EXECUTOR_NAME)
private AsyncTaskExecutor myAsyncTaskExecutor;
private Map<Class<? extends IBaseResource>, IFhirResourceDao<?>> myResourceTypeToDao;
private Semaphore myInitSubscriptionsSemaphore = new Semaphore(1);
/**
* Constructor
@ -350,31 +352,40 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
@SuppressWarnings("unused")
@Scheduled(fixedDelay = 10000)
public void initSubscriptions() {
SearchParameterMap map = new SearchParameterMap();
map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode()));
map.add(Subscription.SP_STATUS, new TokenOrListParam()
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS);
RequestDetails req = new ServletSubRequestDetails();
req.setSubRequest(true);
IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req);
if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) {
ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions. Some subscriptions have not been loaded.");
if (!myInitSubscriptionsSemaphore.tryAcquire()) {
return;
}
try {
ourLog.debug("Starting init subscriptions");
SearchParameterMap map = new SearchParameterMap();
map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode()));
map.add(Subscription.SP_STATUS, new TokenOrListParam()
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS);
List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size());
RequestDetails req = new ServletSubRequestDetails();
req.setSubRequest(true);
Set<String> allIds = new HashSet<>();
for (IBaseResource resource : resourceList) {
String nextId = resource.getIdElement().getIdPart();
allIds.add(nextId);
mySubscriptionActivatingSubscriber.activateAndRegisterSubscriptionIfRequired(resource);
IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req);
if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) {
ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions. Some subscriptions have not been loaded.");
}
List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size());
Set<String> allIds = new HashSet<>();
for (IBaseResource resource : resourceList) {
String nextId = resource.getIdElement().getIdPart();
allIds.add(nextId);
mySubscriptionActivatingSubscriber.activateAndRegisterSubscriptionIfRequired(resource);
}
unregisterAllSubscriptionsNotInCollection(allIds);
ourLog.trace("Finished init subscriptions - found {}", resourceList.size());
} finally {
myInitSubscriptionsSemaphore.release();
}
unregisterAllSubscriptionsNotInCollection(allIds);
}
@SuppressWarnings("unused")
@ -549,6 +560,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
public void unregisterHandler(String theSubscriptionId, MessageHandler theMessageHandler) {
SubscribableChannel channel = mySubscribableChannel.get(theSubscriptionId);
if (channel != null) {
channel.unsubscribe(theMessageHandler);
if (channel instanceof DisposableBean) {
try {
((DisposableBean) channel).destroy();
@ -556,10 +568,9 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
ourLog.error("Failed to destroy channel bean", e);
}
}
channel.unsubscribe(theMessageHandler);
}
mySubscribableChannel.remove(theSubscriptionId, theMessageHandler);
mySubscribableChannel.remove(theSubscriptionId);
}
@SuppressWarnings("UnusedReturnValue")