From ec4604c498835f674178ad8568d881493dad1dc3 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Mon, 6 Aug 2018 20:58:04 -0400 Subject: [PATCH] Work on splitting subscriptions out into separate channels --- .../BaseSubscriptionInterceptor.java | 174 ++++++++++-------- .../SubscriptionCheckingSubscriber.java | 8 +- .../email/SubscriptionEmailInterceptor.java | 32 +--- .../SubscriptionRestHookInterceptor.java | 20 +- .../SubscriptionWebsocketHandler.java | 6 +- .../SubscriptionWebsocketInterceptor.java | 29 ++- .../r4/WebsocketWithSubscriptionIdR4Test.java | 2 + 7 files changed, 142 insertions(+), 129 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java index 27ccc048c39..690acc5ccf8 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -38,6 +38,9 @@ import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter; import ca.uhn.fhir.util.StopWatch; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.hl7.fhir.exceptions.FHIRException; @@ -50,6 +53,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.ExecutorSubscribableChannel; @@ -79,7 +83,7 @@ public abstract class BaseSubscriptionInterceptor exten private MessageHandler mySubscriptionCheckingSubscriber; private ConcurrentHashMap myIdToSubscription = new ConcurrentHashMap<>(); private ConcurrentHashMap myIdToSubscribaleChannel = new ConcurrentHashMap<>(); - private ConcurrentHashMap myIdToDeliveryHandler = new ConcurrentHashMap<>(); + private Multimap myIdToDeliveryHandler = Multimaps.synchronizedListMultimap(ArrayListMultimap.create()); private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class); private ThreadPoolExecutor myDeliveryExecutor; private LinkedBlockingQueue myProcessingExecutorQueue; @@ -137,43 +141,6 @@ public abstract class BaseSubscriptionInterceptor exten return retVal; } - protected abstract MessageHandler createDeliveryHandler(CanonicalSubscription theSubscription); - - protected SubscribableChannel createDeliveryChannel(CanonicalSubscription theSubscription) { - String subscriptionId = theSubscription.getIdElement(myCtx).getIdPart(); - - LinkedBlockingQueue executorQueue = new LinkedBlockingQueue<>(1000); - BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() - .namingPattern("subscription-delivery-" + subscriptionId + "-%d") - .daemon(false) - .priority(Thread.NORM_PRIORITY) - .build(); - RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) { - ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size()); - StopWatch sw = new StopWatch(); - try { - executorQueue.put(theRunnable); - } catch (InterruptedException theE) { - throw new RejectedExecutionException("Task " + theRunnable.toString() + - " rejected from " + theE.toString()); - } - ourLog.info("Slot become available after {}ms", sw.getMillis()); - } - }; - ThreadPoolExecutor deliveryExecutor = new ThreadPoolExecutor( - 1, - getExecutorThreadCount(), - 0L, - TimeUnit.MILLISECONDS, - executorQueue, - threadFactory, - rejectedExecutionHandler); - - return new ExecutorSubscribableChannel(deliveryExecutor); - } - protected CanonicalSubscription canonicalizeDstu3(IBaseResource theSubscription) { org.hl7.fhir.dstu3.model.Subscription subscription = (org.hl7.fhir.dstu3.model.Subscription) theSubscription; @@ -272,6 +239,46 @@ public abstract class BaseSubscriptionInterceptor exten return retVal; } + protected SubscribableChannel createDeliveryChannel(CanonicalSubscription theSubscription) { + String subscriptionId = theSubscription.getIdElement(myCtx).getIdPart(); + + LinkedBlockingQueue executorQueue = new LinkedBlockingQueue<>(1000); + BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() + .namingPattern("subscription-delivery-" + subscriptionId + "-%d") + .daemon(false) + .priority(Thread.NORM_PRIORITY) + .build(); + RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) { + ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size()); + StopWatch sw = new StopWatch(); + try { + executorQueue.put(theRunnable); + } catch (InterruptedException theE) { + throw new RejectedExecutionException("Task " + theRunnable.toString() + + " rejected from " + theE.toString()); + } + ourLog.info("Slot become available after {}ms", sw.getMillis()); + } + }; + ThreadPoolExecutor deliveryExecutor = new ThreadPoolExecutor( + 1, + getExecutorThreadCount(), + 0L, + TimeUnit.MILLISECONDS, + executorQueue, + threadFactory, + rejectedExecutionHandler); + + return new ExecutorSubscribableChannel(deliveryExecutor); + } + + /** + * Returns an empty handler if the interceptor will manually handle registration and unregistration + */ + protected abstract Optional createDeliveryHandler(CanonicalSubscription theSubscription); + public abstract Subscription.SubscriptionChannelType getChannelType(); @SuppressWarnings("unchecked") @@ -293,6 +300,10 @@ public abstract class BaseSubscriptionInterceptor exten return (IFhirResourceDao) myResourceTypeToDao.get(theType); } + protected MessageChannel getDeliveryChannel(CanonicalSubscription theSubscription) { + return myIdToSubscribaleChannel.get(theSubscription.getIdElement(myCtx).getIdPart()); + } + public int getExecutorQueueSizeForUnitTests() { return myProcessingExecutorQueue.size(); } @@ -362,41 +373,36 @@ public abstract class BaseSubscriptionInterceptor exten mySubscriptionActivatingSubscriber.activateAndRegisterSubscriptionIfRequired(resource); } - for (String next : new ArrayList<>(myIdToSubscription.keySet())) { - if (!allIds.contains(next)) { - ourLog.info("Unregistering Subscription/{} as it no longer exists", next); - CanonicalSubscription subscription = myIdToSubscription.get(next); - unregisterSubscription(subscription.getIdElement(myCtx)); - } - } + unregisterAllSubscriptionsNotInCollection(allIds); } @SuppressWarnings("unused") @PreDestroy public void preDestroy() { getProcessingChannel().unsubscribe(mySubscriptionCheckingSubscriber); - - unregisterDeliverySubscriber(); + unregisterAllSubscriptionsNotInCollection(Collections.emptyList()); } - protected abstract void registerDeliverySubscriber(); + public void registerHandler(String theSubscriptionId, MessageHandler theHandler) { + myIdToSubscribaleChannel.get(theSubscriptionId).subscribe(theHandler); + myIdToDeliveryHandler.put(theSubscriptionId, theHandler); + } @SuppressWarnings("UnusedReturnValue") public CanonicalSubscription registerSubscription(IIdType theId, S theSubscription) { Validate.notNull(theId); - Validate.notBlank(theId.getIdPart()); + String subscriptionId = theId.getIdPart(); + Validate.notBlank(subscriptionId); Validate.notNull(theSubscription); - CanonicalSubscription canonicalized = canonicalize(theSubscription); SubscribableChannel deliveryChannel = createDeliveryChannel(canonicalized); - MessageHandler deliveryHandler = createDeliveryHandler(canonicalized); + Optional deliveryHandler = createDeliveryHandler(canonicalized); - deliveryChannel.subscribe(deliveryHandler); + myIdToSubscribaleChannel.put(subscriptionId, deliveryChannel); + myIdToSubscription.put(subscriptionId, canonicalized); - myIdToSubscribaleChannel.put(theId.getIdPart(), deliveryChannel); - myIdToSubscription.put(theId.getIdPart(), canonicalized); - myIdToDeliveryHandler.put(theId.getIdPart(), deliveryHandler); + deliveryHandler.ifPresent(handler -> registerHandler(subscriptionId, handler)); return canonicalized; } @@ -482,19 +488,16 @@ public abstract class BaseSubscriptionInterceptor exten if (getProcessingChannel() == null) { myProcessingExecutorQueue = new LinkedBlockingQueue<>(1000); - RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) { - ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myProcessingExecutorQueue.size()); - StopWatch sw = new StopWatch(); - try { - myProcessingExecutorQueue.put(theRunnable); - } catch (InterruptedException theE) { - throw new RejectedExecutionException("Task " + theRunnable.toString() + - " rejected from " + theE.toString()); - } - ourLog.info("Slot become available after {}ms", sw.getMillis()); + RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> { + ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myProcessingExecutorQueue.size()); + StopWatch sw = new StopWatch(); + try { + myProcessingExecutorQueue.put(theRunnable); + } catch (InterruptedException theE) { + throw new RejectedExecutionException("Task " + theRunnable.toString() + + " rejected from " + theE.toString()); } + ourLog.info("Slot become available after {}ms", sw.getMillis()); }; ThreadFactory threadFactory = new BasicThreadFactory.Builder() .namingPattern("subscription-proc-%d") @@ -512,13 +515,11 @@ public abstract class BaseSubscriptionInterceptor exten setProcessingChannel(new ExecutorSubscribableChannel(myProcessingExecutor)); } - if (mySubscriptionActivatingSubscriber == null) { mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), getChannelType(), this, myTxManager, myAsyncTaskExecutor); } registerSubscriptionCheckingSubscriber(); - registerDeliverySubscriber(); TransactionTemplate transactionTemplate = new TransactionTemplate(myTxManager); transactionTemplate.execute(new TransactionCallbackWithoutResult() { @@ -534,14 +535,39 @@ public abstract class BaseSubscriptionInterceptor exten sendToProcessingChannel(theMsg); } - protected abstract void unregisterDeliverySubscriber(); + private void unregisterAllSubscriptionsNotInCollection(Collection theAllIds) { + for (String next : new ArrayList<>(myIdToSubscription.keySet())) { + if (!theAllIds.contains(next)) { + ourLog.info("Unregistering Subscription/{}", next); + CanonicalSubscription subscription = myIdToSubscription.get(next); + unregisterSubscription(subscription.getIdElement(myCtx)); + } + } + } + + public void unregisterHandler(String theSubscriptionId, MessageHandler next) { + SubscribableChannel channel = myIdToSubscribaleChannel.get(theSubscriptionId); + if (channel != null) { + channel.unsubscribe(next); + } + + myIdToSubscribaleChannel.remove(theSubscriptionId, next); + } @SuppressWarnings("UnusedReturnValue") public CanonicalSubscription unregisterSubscription(IIdType theId) { Validate.notNull(theId); - Validate.notBlank(theId.getIdPart()); - return myIdToSubscription.remove(theId.getIdPart()); + String subscriptionId = theId.getIdPart(); + Validate.notBlank(subscriptionId); + + for (MessageHandler next : new ArrayList<>(myIdToDeliveryHandler.get(subscriptionId))) { + unregisterHandler(subscriptionId, next); + } + + myIdToSubscribaleChannel.remove(subscriptionId); + + return myIdToSubscription.remove(subscriptionId); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java index af5678ce1b0..33424b7e15f 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java @@ -34,6 +34,7 @@ import org.hl7.fhir.r4.model.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessagingException; import java.util.List; @@ -117,7 +118,12 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber { deliveryMsg.setPayloadId(msg.getId(getContext())); ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg); - getSubscriptionInterceptor().getDeliveryChannel().send(wrappedMsg); + MessageChannel deliveryChannel = getSubscriptionInterceptor().getDeliveryChannel(nextSubscription); + if (deliveryChannel != null) { + deliveryChannel.send(wrappedMsg); + } else { + ourLog.warn("Do not have deliovery channel for subscription {}", nextSubscription.getIdElement(getContext())); + } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/email/SubscriptionEmailInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/email/SubscriptionEmailInterceptor.java index 560d4aa2145..35ff5fc1b64 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/email/SubscriptionEmailInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/email/SubscriptionEmailInterceptor.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.email; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,13 +21,14 @@ package ca.uhn.fhir.jpa.subscription.email; */ import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor; +import ca.uhn.fhir.jpa.subscription.CanonicalSubscription; import org.apache.commons.lang3.Validate; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.MessageHandler; -import javax.annotation.PostConstruct; +import java.util.Optional; public class SubscriptionEmailInterceptor extends BaseSubscriptionInterceptor { - private SubscriptionDeliveringEmailSubscriber mySubscriptionDeliverySubscriber; /** * This is set to autowired=false just so that implementors can supply this @@ -37,6 +38,11 @@ public class SubscriptionEmailInterceptor extends BaseSubscriptionInterceptor { private IEmailSender myEmailSender; private String myDefaultFromAddress = "noreply@unknown.com"; + @Override + protected Optional createDeliveryHandler(CanonicalSubscription theSubscription) { + return Optional.of(new SubscriptionDeliveringEmailSubscriber(getSubscriptionDao(), getChannelType(), this)); + } + @Override public org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType getChannelType() { return org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.EMAIL; @@ -69,23 +75,5 @@ public class SubscriptionEmailInterceptor extends BaseSubscriptionInterceptor { myEmailSender = theEmailSender; } - @Override - protected void registerDeliverySubscriber() { - if (mySubscriptionDeliverySubscriber == null) { - mySubscriptionDeliverySubscriber = new SubscriptionDeliveringEmailSubscriber(getSubscriptionDao(), getChannelType(), this); - } - getDeliveryChannel().subscribe(mySubscriptionDeliverySubscriber); - } -// @PostConstruct -// public void start() { -// Validate.notNull(myEmailSender, "emailSender has not been configured"); -// -// super.start(); -// } - - @Override - protected void unregisterDeliverySubscriber() { - getDeliveryChannel().unsubscribe(mySubscriptionDeliverySubscriber); - } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionRestHookInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionRestHookInterceptor.java index 10cb72b981b..9c0558ba310 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionRestHookInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionRestHookInterceptor.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.resthook; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,16 +21,16 @@ package ca.uhn.fhir.jpa.subscription.resthook; */ import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor; +import ca.uhn.fhir.jpa.subscription.CanonicalSubscription; +import org.springframework.messaging.MessageHandler; + +import java.util.Optional; public class SubscriptionRestHookInterceptor extends BaseSubscriptionInterceptor { - private SubscriptionDeliveringRestHookSubscriber mySubscriptionDeliverySubscriber; @Override - protected void registerDeliverySubscriber() { - if (mySubscriptionDeliverySubscriber == null) { - mySubscriptionDeliverySubscriber = new SubscriptionDeliveringRestHookSubscriber(getSubscriptionDao(), getChannelType(), this); - } - getDeliveryChannel().subscribe(mySubscriptionDeliverySubscriber); + protected Optional createDeliveryHandler(CanonicalSubscription theSubscription) { + return Optional.of(new SubscriptionDeliveringRestHookSubscriber(getSubscriptionDao(), getChannelType(), this)); } @Override @@ -38,8 +38,4 @@ public class SubscriptionRestHookInterceptor extends BaseSubscriptionInterceptor return org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.RESTHOOK; } - @Override - protected void unregisterDeliverySubscriber() { - getDeliveryChannel().unsubscribe(mySubscriptionDeliverySubscriber); - } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketHandler.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketHandler.java index 0d2ae3e3659..33c541c780e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketHandler.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketHandler.java @@ -109,12 +109,14 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement mySession = theSession; mySubscription = theSubscription; - mySubscriptionWebsocketInterceptor.getDeliveryChannel().subscribe(this); + String subscriptionId = mySubscription.getIdElement(myCtx).getIdPart(); + mySubscriptionWebsocketInterceptor.registerHandler(subscriptionId, this); } @Override public void closing() { - mySubscriptionWebsocketInterceptor.getDeliveryChannel().unsubscribe(this); + String subscriptionId = mySubscription.getIdElement(myCtx).getIdPart(); + mySubscriptionWebsocketInterceptor.unregisterHandler(subscriptionId, this); } private void deliver() { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketInterceptor.java index dbdbbb1efba..95509a3180e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketInterceptor.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.websocket; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,10 +23,14 @@ package ca.uhn.fhir.jpa.subscription.websocket; import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; import ca.uhn.fhir.jpa.dao.data.ISubscriptionTableDao; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor; +import ca.uhn.fhir.jpa.subscription.CanonicalSubscription; import org.hl7.fhir.r4.model.Subscription; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.MessageHandler; import org.springframework.transaction.PlatformTransactionManager; +import java.util.Optional; + public class SubscriptionWebsocketInterceptor extends BaseSubscriptionInterceptor { @Autowired @@ -38,26 +42,15 @@ public class SubscriptionWebsocketInterceptor extends BaseSubscriptionIntercepto @Autowired private IResourceTableDao myResourceTableDao; + @Override + protected Optional createDeliveryHandler(CanonicalSubscription theSubscription) { + return Optional.empty(); + } + @Override public Subscription.SubscriptionChannelType getChannelType() { return Subscription.SubscriptionChannelType.WEBSOCKET; } - @Override - protected void registerDeliverySubscriber() { - /* - * nothing, since individual websocket connections - * register themselves - */ - } - @Override - protected void unregisterDeliverySubscriber() { - - /* - * nothing, since individual websocket connections - * register themselves - */ - - } } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/WebsocketWithSubscriptionIdR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/WebsocketWithSubscriptionIdR4Test.java index 90a973e69e6..0ee08decc54 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/WebsocketWithSubscriptionIdR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/r4/WebsocketWithSubscriptionIdR4Test.java @@ -47,6 +47,7 @@ public class WebsocketWithSubscriptionIdR4Test extends BaseResourceProviderR4Tes private WebSocketClient myWebSocketClient; private SocketImplementation mySocketImplementation; + @Override @After public void after() throws Exception { super.after(); @@ -60,6 +61,7 @@ public class WebsocketWithSubscriptionIdR4Test extends BaseResourceProviderR4Tes myWebSocketClient.stop(); } + @Override @Before public void before() throws Exception { super.before();