From d59a40d01a5f90b6bdf5d70543e181588d3c3984 Mon Sep 17 00:00:00 2001 From: jamesagnew Date: Mon, 6 Aug 2018 19:03:30 -0400 Subject: [PATCH] Subscription cleanup started (won't build) --- .../BaseSubscriptionInterceptor.java | 97 ++++++++++--------- 1 file changed, 52 insertions(+), 45 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java index 3ac6b74f99f..27ccc048c39 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java @@ -72,16 +72,17 @@ public abstract class BaseSubscriptionInterceptor exten static final String SUBSCRIPTION_TYPE = "Subscription.channel.type"; private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000; private SubscribableChannel myProcessingChannel; - private SubscribableChannel myDeliveryChannel; + private Map myDeliveryChannel; private ExecutorService myProcessingExecutor; private int myExecutorThreadCount; private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber; private MessageHandler mySubscriptionCheckingSubscriber; private ConcurrentHashMap myIdToSubscription = new ConcurrentHashMap<>(); + private ConcurrentHashMap myIdToSubscribaleChannel = new ConcurrentHashMap<>(); + private ConcurrentHashMap myIdToDeliveryHandler = new ConcurrentHashMap<>(); private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class); private ThreadPoolExecutor myDeliveryExecutor; private LinkedBlockingQueue myProcessingExecutorQueue; - private LinkedBlockingQueue myDeliveryExecutorQueue; private IFhirResourceDao mySubscriptionDao; @Autowired private List> myResourceDaos; @@ -136,6 +137,43 @@ public abstract class BaseSubscriptionInterceptor exten return retVal; } + protected abstract MessageHandler createDeliveryHandler(CanonicalSubscription theSubscription); + + protected SubscribableChannel createDeliveryChannel(CanonicalSubscription theSubscription) { + String subscriptionId = theSubscription.getIdElement(myCtx).getIdPart(); + + LinkedBlockingQueue executorQueue = new LinkedBlockingQueue<>(1000); + BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() + .namingPattern("subscription-delivery-" + subscriptionId + "-%d") + .daemon(false) + .priority(Thread.NORM_PRIORITY) + .build(); + RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) { + ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size()); + StopWatch sw = new StopWatch(); + try { + executorQueue.put(theRunnable); + } catch (InterruptedException theE) { + throw new RejectedExecutionException("Task " + theRunnable.toString() + + " rejected from " + theE.toString()); + } + ourLog.info("Slot become available after {}ms", sw.getMillis()); + } + }; + ThreadPoolExecutor deliveryExecutor = new ThreadPoolExecutor( + 1, + getExecutorThreadCount(), + 0L, + TimeUnit.MILLISECONDS, + executorQueue, + threadFactory, + rejectedExecutionHandler); + + return new ExecutorSubscribableChannel(deliveryExecutor); + } + protected CanonicalSubscription canonicalizeDstu3(IBaseResource theSubscription) { org.hl7.fhir.dstu3.model.Subscription subscription = (org.hl7.fhir.dstu3.model.Subscription) theSubscription; @@ -255,16 +293,8 @@ public abstract class BaseSubscriptionInterceptor exten return (IFhirResourceDao) myResourceTypeToDao.get(theType); } - public SubscribableChannel getDeliveryChannel() { - return myDeliveryChannel; - } - - public void setDeliveryChannel(SubscribableChannel theDeliveryChannel) { - myDeliveryChannel = theDeliveryChannel; - } - public int getExecutorQueueSizeForUnitTests() { - return myProcessingExecutorQueue.size() + myDeliveryExecutorQueue.size(); + return myProcessingExecutorQueue.size(); } public int getExecutorThreadCount() { @@ -332,11 +362,11 @@ public abstract class BaseSubscriptionInterceptor exten mySubscriptionActivatingSubscriber.activateAndRegisterSubscriptionIfRequired(resource); } - for (Enumeration keyEnum = myIdToSubscription.keys(); keyEnum.hasMoreElements(); ) { - String next = keyEnum.nextElement(); + for (String next : new ArrayList<>(myIdToSubscription.keySet())) { if (!allIds.contains(next)) { ourLog.info("Unregistering Subscription/{} as it no longer exists", next); - myIdToSubscription.remove(next); + CanonicalSubscription subscription = myIdToSubscription.get(next); + unregisterSubscription(subscription.getIdElement(myCtx)); } } } @@ -357,8 +387,16 @@ public abstract class BaseSubscriptionInterceptor exten Validate.notBlank(theId.getIdPart()); Validate.notNull(theSubscription); + CanonicalSubscription canonicalized = canonicalize(theSubscription); + SubscribableChannel deliveryChannel = createDeliveryChannel(canonicalized); + MessageHandler deliveryHandler = createDeliveryHandler(canonicalized); + + deliveryChannel.subscribe(deliveryHandler); + + myIdToSubscribaleChannel.put(theId.getIdPart(), deliveryChannel); myIdToSubscription.put(theId.getIdPart(), canonicalized); + myIdToDeliveryHandler.put(theId.getIdPart(), deliveryHandler); return canonicalized; } @@ -474,37 +512,6 @@ public abstract class BaseSubscriptionInterceptor exten setProcessingChannel(new ExecutorSubscribableChannel(myProcessingExecutor)); } - if (getDeliveryChannel() == null) { - myDeliveryExecutorQueue = new LinkedBlockingQueue<>(1000); - BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() - .namingPattern("subscription-delivery-%d") - .daemon(false) - .priority(Thread.NORM_PRIORITY) - .build(); - RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) { - ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myDeliveryExecutorQueue.size()); - StopWatch sw = new StopWatch(); - try { - myDeliveryExecutorQueue.put(theRunnable); - } catch (InterruptedException theE) { - throw new RejectedExecutionException("Task " + theRunnable.toString() + - " rejected from " + theE.toString()); - } - ourLog.info("Slot become available after {}ms", sw.getMillis()); - } - }; - myDeliveryExecutor = new ThreadPoolExecutor( - 1, - getExecutorThreadCount(), - 0L, - TimeUnit.MILLISECONDS, - myDeliveryExecutorQueue, - threadFactory, - rejectedExecutionHandler); - setDeliveryChannel(new ExecutorSubscribableChannel(myDeliveryExecutor)); - } if (mySubscriptionActivatingSubscriber == null) { mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), getChannelType(), this, myTxManager, myAsyncTaskExecutor);