Subscription cleanup started (won't build)

This commit is contained in:
jamesagnew 2018-08-06 19:03:30 -04:00
parent c98a1e0c62
commit d59a40d01a
1 changed files with 52 additions and 45 deletions

View File

@ -72,16 +72,17 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000;
private SubscribableChannel myProcessingChannel;
private SubscribableChannel myDeliveryChannel;
private Map<String, SubscribableChannel> myDeliveryChannel;
private ExecutorService myProcessingExecutor;
private int myExecutorThreadCount;
private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
private MessageHandler mySubscriptionCheckingSubscriber;
private ConcurrentHashMap<String, CanonicalSubscription> myIdToSubscription = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, SubscribableChannel> myIdToSubscribaleChannel = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, MessageHandler> myIdToDeliveryHandler = new ConcurrentHashMap<>();
private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
private ThreadPoolExecutor myDeliveryExecutor;
private LinkedBlockingQueue<Runnable> myProcessingExecutorQueue;
private LinkedBlockingQueue<Runnable> myDeliveryExecutorQueue;
private IFhirResourceDao<?> mySubscriptionDao;
@Autowired
private List<IFhirResourceDao<?>> myResourceDaos;
@ -136,6 +137,43 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
return retVal;
}
protected abstract MessageHandler createDeliveryHandler(CanonicalSubscription theSubscription);
protected SubscribableChannel createDeliveryChannel(CanonicalSubscription theSubscription) {
String subscriptionId = theSubscription.getIdElement(myCtx).getIdPart();
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("subscription-delivery-" + subscriptionId + "-%d")
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
StopWatch sw = new StopWatch();
try {
executorQueue.put(theRunnable);
} catch (InterruptedException theE) {
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
}
};
ThreadPoolExecutor deliveryExecutor = new ThreadPoolExecutor(
1,
getExecutorThreadCount(),
0L,
TimeUnit.MILLISECONDS,
executorQueue,
threadFactory,
rejectedExecutionHandler);
return new ExecutorSubscribableChannel(deliveryExecutor);
}
protected CanonicalSubscription canonicalizeDstu3(IBaseResource theSubscription) {
org.hl7.fhir.dstu3.model.Subscription subscription = (org.hl7.fhir.dstu3.model.Subscription) theSubscription;
@ -255,16 +293,8 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
return (IFhirResourceDao<R>) myResourceTypeToDao.get(theType);
}
public SubscribableChannel getDeliveryChannel() {
return myDeliveryChannel;
}
public void setDeliveryChannel(SubscribableChannel theDeliveryChannel) {
myDeliveryChannel = theDeliveryChannel;
}
public int getExecutorQueueSizeForUnitTests() {
return myProcessingExecutorQueue.size() + myDeliveryExecutorQueue.size();
return myProcessingExecutorQueue.size();
}
public int getExecutorThreadCount() {
@ -332,11 +362,11 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
mySubscriptionActivatingSubscriber.activateAndRegisterSubscriptionIfRequired(resource);
}
for (Enumeration<String> keyEnum = myIdToSubscription.keys(); keyEnum.hasMoreElements(); ) {
String next = keyEnum.nextElement();
for (String next : new ArrayList<>(myIdToSubscription.keySet())) {
if (!allIds.contains(next)) {
ourLog.info("Unregistering Subscription/{} as it no longer exists", next);
myIdToSubscription.remove(next);
CanonicalSubscription subscription = myIdToSubscription.get(next);
unregisterSubscription(subscription.getIdElement(myCtx));
}
}
}
@ -357,8 +387,16 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
Validate.notBlank(theId.getIdPart());
Validate.notNull(theSubscription);
CanonicalSubscription canonicalized = canonicalize(theSubscription);
SubscribableChannel deliveryChannel = createDeliveryChannel(canonicalized);
MessageHandler deliveryHandler = createDeliveryHandler(canonicalized);
deliveryChannel.subscribe(deliveryHandler);
myIdToSubscribaleChannel.put(theId.getIdPart(), deliveryChannel);
myIdToSubscription.put(theId.getIdPart(), canonicalized);
myIdToDeliveryHandler.put(theId.getIdPart(), deliveryHandler);
return canonicalized;
}
@ -474,37 +512,6 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
setProcessingChannel(new ExecutorSubscribableChannel(myProcessingExecutor));
}
if (getDeliveryChannel() == null) {
myDeliveryExecutorQueue = new LinkedBlockingQueue<>(1000);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("subscription-delivery-%d")
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myDeliveryExecutorQueue.size());
StopWatch sw = new StopWatch();
try {
myDeliveryExecutorQueue.put(theRunnable);
} catch (InterruptedException theE) {
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
}
};
myDeliveryExecutor = new ThreadPoolExecutor(
1,
getExecutorThreadCount(),
0L,
TimeUnit.MILLISECONDS,
myDeliveryExecutorQueue,
threadFactory,
rejectedExecutionHandler);
setDeliveryChannel(new ExecutorSubscribableChannel(myDeliveryExecutor));
}
if (mySubscriptionActivatingSubscriber == null) {
mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), getChannelType(), this, myTxManager, myAsyncTaskExecutor);