Work on splitting subscriptions out into separate channels

This commit is contained in:
James Agnew 2018-08-06 20:58:04 -04:00
parent d59a40d01a
commit ec4604c498
7 changed files with 142 additions and 129 deletions

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -38,6 +38,9 @@ import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
import ca.uhn.fhir.util.StopWatch;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.hl7.fhir.exceptions.FHIRException;
@ -50,6 +53,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
@ -79,7 +83,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
private MessageHandler mySubscriptionCheckingSubscriber;
private ConcurrentHashMap<String, CanonicalSubscription> myIdToSubscription = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, SubscribableChannel> myIdToSubscribaleChannel = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, MessageHandler> myIdToDeliveryHandler = new ConcurrentHashMap<>();
private Multimap<String, MessageHandler> myIdToDeliveryHandler = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
private ThreadPoolExecutor myDeliveryExecutor;
private LinkedBlockingQueue<Runnable> myProcessingExecutorQueue;
@ -137,43 +141,6 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
return retVal;
}
protected abstract MessageHandler createDeliveryHandler(CanonicalSubscription theSubscription);
protected SubscribableChannel createDeliveryChannel(CanonicalSubscription theSubscription) {
String subscriptionId = theSubscription.getIdElement(myCtx).getIdPart();
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("subscription-delivery-" + subscriptionId + "-%d")
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
StopWatch sw = new StopWatch();
try {
executorQueue.put(theRunnable);
} catch (InterruptedException theE) {
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
}
};
ThreadPoolExecutor deliveryExecutor = new ThreadPoolExecutor(
1,
getExecutorThreadCount(),
0L,
TimeUnit.MILLISECONDS,
executorQueue,
threadFactory,
rejectedExecutionHandler);
return new ExecutorSubscribableChannel(deliveryExecutor);
}
protected CanonicalSubscription canonicalizeDstu3(IBaseResource theSubscription) {
org.hl7.fhir.dstu3.model.Subscription subscription = (org.hl7.fhir.dstu3.model.Subscription) theSubscription;
@ -272,6 +239,46 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
return retVal;
}
protected SubscribableChannel createDeliveryChannel(CanonicalSubscription theSubscription) {
String subscriptionId = theSubscription.getIdElement(myCtx).getIdPart();
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("subscription-delivery-" + subscriptionId + "-%d")
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
StopWatch sw = new StopWatch();
try {
executorQueue.put(theRunnable);
} catch (InterruptedException theE) {
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
}
};
ThreadPoolExecutor deliveryExecutor = new ThreadPoolExecutor(
1,
getExecutorThreadCount(),
0L,
TimeUnit.MILLISECONDS,
executorQueue,
threadFactory,
rejectedExecutionHandler);
return new ExecutorSubscribableChannel(deliveryExecutor);
}
/**
* Returns an empty handler if the interceptor will manually handle registration and unregistration
*/
protected abstract Optional<MessageHandler> createDeliveryHandler(CanonicalSubscription theSubscription);
public abstract Subscription.SubscriptionChannelType getChannelType();
@SuppressWarnings("unchecked")
@ -293,6 +300,10 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
return (IFhirResourceDao<R>) myResourceTypeToDao.get(theType);
}
protected MessageChannel getDeliveryChannel(CanonicalSubscription theSubscription) {
return myIdToSubscribaleChannel.get(theSubscription.getIdElement(myCtx).getIdPart());
}
public int getExecutorQueueSizeForUnitTests() {
return myProcessingExecutorQueue.size();
}
@ -362,41 +373,36 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
mySubscriptionActivatingSubscriber.activateAndRegisterSubscriptionIfRequired(resource);
}
for (String next : new ArrayList<>(myIdToSubscription.keySet())) {
if (!allIds.contains(next)) {
ourLog.info("Unregistering Subscription/{} as it no longer exists", next);
CanonicalSubscription subscription = myIdToSubscription.get(next);
unregisterSubscription(subscription.getIdElement(myCtx));
}
}
unregisterAllSubscriptionsNotInCollection(allIds);
}
@SuppressWarnings("unused")
@PreDestroy
public void preDestroy() {
getProcessingChannel().unsubscribe(mySubscriptionCheckingSubscriber);
unregisterDeliverySubscriber();
unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
}
protected abstract void registerDeliverySubscriber();
public void registerHandler(String theSubscriptionId, MessageHandler theHandler) {
myIdToSubscribaleChannel.get(theSubscriptionId).subscribe(theHandler);
myIdToDeliveryHandler.put(theSubscriptionId, theHandler);
}
@SuppressWarnings("UnusedReturnValue")
public CanonicalSubscription registerSubscription(IIdType theId, S theSubscription) {
Validate.notNull(theId);
Validate.notBlank(theId.getIdPart());
String subscriptionId = theId.getIdPart();
Validate.notBlank(subscriptionId);
Validate.notNull(theSubscription);
CanonicalSubscription canonicalized = canonicalize(theSubscription);
SubscribableChannel deliveryChannel = createDeliveryChannel(canonicalized);
MessageHandler deliveryHandler = createDeliveryHandler(canonicalized);
Optional<MessageHandler> deliveryHandler = createDeliveryHandler(canonicalized);
deliveryChannel.subscribe(deliveryHandler);
myIdToSubscribaleChannel.put(subscriptionId, deliveryChannel);
myIdToSubscription.put(subscriptionId, canonicalized);
myIdToSubscribaleChannel.put(theId.getIdPart(), deliveryChannel);
myIdToSubscription.put(theId.getIdPart(), canonicalized);
myIdToDeliveryHandler.put(theId.getIdPart(), deliveryHandler);
deliveryHandler.ifPresent(handler -> registerHandler(subscriptionId, handler));
return canonicalized;
}
@ -482,19 +488,16 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
if (getProcessingChannel() == null) {
myProcessingExecutorQueue = new LinkedBlockingQueue<>(1000);
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myProcessingExecutorQueue.size());
StopWatch sw = new StopWatch();
try {
myProcessingExecutorQueue.put(theRunnable);
} catch (InterruptedException theE) {
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myProcessingExecutorQueue.size());
StopWatch sw = new StopWatch();
try {
myProcessingExecutorQueue.put(theRunnable);
} catch (InterruptedException theE) {
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
};
ThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("subscription-proc-%d")
@ -512,13 +515,11 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
setProcessingChannel(new ExecutorSubscribableChannel(myProcessingExecutor));
}
if (mySubscriptionActivatingSubscriber == null) {
mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), getChannelType(), this, myTxManager, myAsyncTaskExecutor);
}
registerSubscriptionCheckingSubscriber();
registerDeliverySubscriber();
TransactionTemplate transactionTemplate = new TransactionTemplate(myTxManager);
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@ -534,14 +535,39 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
sendToProcessingChannel(theMsg);
}
protected abstract void unregisterDeliverySubscriber();
private void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {
for (String next : new ArrayList<>(myIdToSubscription.keySet())) {
if (!theAllIds.contains(next)) {
ourLog.info("Unregistering Subscription/{}", next);
CanonicalSubscription subscription = myIdToSubscription.get(next);
unregisterSubscription(subscription.getIdElement(myCtx));
}
}
}
public void unregisterHandler(String theSubscriptionId, MessageHandler next) {
SubscribableChannel channel = myIdToSubscribaleChannel.get(theSubscriptionId);
if (channel != null) {
channel.unsubscribe(next);
}
myIdToSubscribaleChannel.remove(theSubscriptionId, next);
}
@SuppressWarnings("UnusedReturnValue")
public CanonicalSubscription unregisterSubscription(IIdType theId) {
Validate.notNull(theId);
Validate.notBlank(theId.getIdPart());
return myIdToSubscription.remove(theId.getIdPart());
String subscriptionId = theId.getIdPart();
Validate.notBlank(subscriptionId);
for (MessageHandler next : new ArrayList<>(myIdToDeliveryHandler.get(subscriptionId))) {
unregisterHandler(subscriptionId, next);
}
myIdToSubscribaleChannel.remove(subscriptionId);
return myIdToSubscription.remove(subscriptionId);
}

View File

@ -34,6 +34,7 @@ import org.hl7.fhir.r4.model.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import java.util.List;
@ -117,7 +118,12 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
deliveryMsg.setPayloadId(msg.getId(getContext()));
ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg);
getSubscriptionInterceptor().getDeliveryChannel().send(wrappedMsg);
MessageChannel deliveryChannel = getSubscriptionInterceptor().getDeliveryChannel(nextSubscription);
if (deliveryChannel != null) {
deliveryChannel.send(wrappedMsg);
} else {
ourLog.warn("Do not have deliovery channel for subscription {}", nextSubscription.getIdElement(getContext()));
}
}

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.email;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -21,13 +21,14 @@ package ca.uhn.fhir.jpa.subscription.email;
*/
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
import org.apache.commons.lang3.Validate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHandler;
import javax.annotation.PostConstruct;
import java.util.Optional;
public class SubscriptionEmailInterceptor extends BaseSubscriptionInterceptor {
private SubscriptionDeliveringEmailSubscriber mySubscriptionDeliverySubscriber;
/**
* This is set to autowired=false just so that implementors can supply this
@ -37,6 +38,11 @@ public class SubscriptionEmailInterceptor extends BaseSubscriptionInterceptor {
private IEmailSender myEmailSender;
private String myDefaultFromAddress = "noreply@unknown.com";
@Override
protected Optional<MessageHandler> createDeliveryHandler(CanonicalSubscription theSubscription) {
return Optional.of(new SubscriptionDeliveringEmailSubscriber(getSubscriptionDao(), getChannelType(), this));
}
@Override
public org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType getChannelType() {
return org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.EMAIL;
@ -69,23 +75,5 @@ public class SubscriptionEmailInterceptor extends BaseSubscriptionInterceptor {
myEmailSender = theEmailSender;
}
@Override
protected void registerDeliverySubscriber() {
if (mySubscriptionDeliverySubscriber == null) {
mySubscriptionDeliverySubscriber = new SubscriptionDeliveringEmailSubscriber(getSubscriptionDao(), getChannelType(), this);
}
getDeliveryChannel().subscribe(mySubscriptionDeliverySubscriber);
}
// @PostConstruct
// public void start() {
// Validate.notNull(myEmailSender, "emailSender has not been configured");
//
// super.start();
// }
@Override
protected void unregisterDeliverySubscriber() {
getDeliveryChannel().unsubscribe(mySubscriptionDeliverySubscriber);
}
}

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.resthook;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -21,16 +21,16 @@ package ca.uhn.fhir.jpa.subscription.resthook;
*/
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
import org.springframework.messaging.MessageHandler;
import java.util.Optional;
public class SubscriptionRestHookInterceptor extends BaseSubscriptionInterceptor {
private SubscriptionDeliveringRestHookSubscriber mySubscriptionDeliverySubscriber;
@Override
protected void registerDeliverySubscriber() {
if (mySubscriptionDeliverySubscriber == null) {
mySubscriptionDeliverySubscriber = new SubscriptionDeliveringRestHookSubscriber(getSubscriptionDao(), getChannelType(), this);
}
getDeliveryChannel().subscribe(mySubscriptionDeliverySubscriber);
protected Optional<MessageHandler> createDeliveryHandler(CanonicalSubscription theSubscription) {
return Optional.of(new SubscriptionDeliveringRestHookSubscriber(getSubscriptionDao(), getChannelType(), this));
}
@Override
@ -38,8 +38,4 @@ public class SubscriptionRestHookInterceptor extends BaseSubscriptionInterceptor
return org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.RESTHOOK;
}
@Override
protected void unregisterDeliverySubscriber() {
getDeliveryChannel().unsubscribe(mySubscriptionDeliverySubscriber);
}
}

View File

@ -109,12 +109,14 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
mySession = theSession;
mySubscription = theSubscription;
mySubscriptionWebsocketInterceptor.getDeliveryChannel().subscribe(this);
String subscriptionId = mySubscription.getIdElement(myCtx).getIdPart();
mySubscriptionWebsocketInterceptor.registerHandler(subscriptionId, this);
}
@Override
public void closing() {
mySubscriptionWebsocketInterceptor.getDeliveryChannel().unsubscribe(this);
String subscriptionId = mySubscription.getIdElement(myCtx).getIdPart();
mySubscriptionWebsocketInterceptor.unregisterHandler(subscriptionId, this);
}
private void deliver() {

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.websocket;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -23,10 +23,14 @@ package ca.uhn.fhir.jpa.subscription.websocket;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.dao.data.ISubscriptionTableDao;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
import org.hl7.fhir.r4.model.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHandler;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.Optional;
public class SubscriptionWebsocketInterceptor extends BaseSubscriptionInterceptor {
@Autowired
@ -38,26 +42,15 @@ public class SubscriptionWebsocketInterceptor extends BaseSubscriptionIntercepto
@Autowired
private IResourceTableDao myResourceTableDao;
@Override
protected Optional<MessageHandler> createDeliveryHandler(CanonicalSubscription theSubscription) {
return Optional.empty();
}
@Override
public Subscription.SubscriptionChannelType getChannelType() {
return Subscription.SubscriptionChannelType.WEBSOCKET;
}
@Override
protected void registerDeliverySubscriber() {
/*
* nothing, since individual websocket connections
* register themselves
*/
}
@Override
protected void unregisterDeliverySubscriber() {
/*
* nothing, since individual websocket connections
* register themselves
*/
}
}

View File

@ -47,6 +47,7 @@ public class WebsocketWithSubscriptionIdR4Test extends BaseResourceProviderR4Tes
private WebSocketClient myWebSocketClient;
private SocketImplementation mySocketImplementation;
@Override
@After
public void after() throws Exception {
super.after();
@ -60,6 +61,7 @@ public class WebsocketWithSubscriptionIdR4Test extends BaseResourceProviderR4Tes
myWebSocketClient.stop();
}
@Override
@Before
public void before() throws Exception {
super.before();