diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/api/IChannelReceiver.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/api/IChannelReceiver.java index 4fd2a193e7f..ad12d8a5952 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/api/IChannelReceiver.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/api/IChannelReceiver.java @@ -20,9 +20,10 @@ package ca.uhn.fhir.jpa.subscription.channel.api; * #L% */ +import org.springframework.beans.factory.DisposableBean; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.InterceptableChannel; -public interface IChannelReceiver extends SubscribableChannel, InterceptableChannel { +public interface IChannelReceiver extends SubscribableChannel, InterceptableChannel, DisposableBean { String getName(); } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannel.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannel.java index c4a647493c7..e417584382a 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannel.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/impl/LinkedBlockingChannel.java @@ -52,4 +52,9 @@ public class LinkedBlockingChannel extends ExecutorSubscribableChannel implement public String getName() { return myName; } + + @Override + public void destroy() { + // nothing + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/BroadcastingSubscribableChannelWrapper.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/BroadcastingSubscribableChannelWrapper.java new file mode 100644 index 00000000000..dc61980395d --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/BroadcastingSubscribableChannelWrapper.java @@ -0,0 +1,75 @@ +package ca.uhn.fhir.jpa.subscription.channel.subscription; + +/*- + * #%L + * HAPI FHIR Subscription Server + * %% + * Copyright (C) 2014 - 2020 University Health Network + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; +import org.apache.commons.lang3.Validate; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.support.AbstractSubscribableChannel; +import org.springframework.messaging.support.ChannelInterceptor; + +import java.util.Set; + +public class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements IChannelReceiver { + + private final IChannelReceiver myWrappedChannel; + private final MessageHandler myHandler; + + public BroadcastingSubscribableChannelWrapper(IChannelReceiver theChannel) { + myHandler = message -> send(message); + theChannel.subscribe(myHandler); + myWrappedChannel = theChannel; + } + + public SubscribableChannel getWrappedChannel() { + return myWrappedChannel; + } + + @Override + protected boolean sendInternal(Message theMessage, long timeout) { + Set subscribers = getSubscribers(); + Validate.isTrue(subscribers.size() > 0, "Channel has zero subscribers"); + for (MessageHandler next : subscribers) { + next.handleMessage(theMessage); + } + return true; + } + + @Override + public void destroy() throws Exception { + myWrappedChannel.destroy(); + myWrappedChannel.unsubscribe(myHandler); + } + + @Override + public void addInterceptor(ChannelInterceptor interceptor) { + super.addInterceptor(interceptor); + myWrappedChannel.addInterceptor(interceptor); + } + + + @Override + public String getName() { + return myWrappedChannel.getName(); + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java index 2ad7287f8d4..a8abc0394eb 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java @@ -29,12 +29,6 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import org.apache.commons.lang3.Validate; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.SubscribableChannel; -import org.springframework.messaging.support.AbstractSubscribableChannel; -import org.springframework.messaging.support.ChannelInterceptor; public class SubscriptionChannelFactory { private final IChannelFactory myChannelFactory; @@ -105,44 +99,4 @@ public class SubscriptionChannelFactory { return myChannelFactory; } - public static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements IChannelReceiver, DisposableBean { - - private final IChannelReceiver myWrappedChannel; - - public BroadcastingSubscribableChannelWrapper(IChannelReceiver theChannel) { - theChannel.subscribe(message -> send(message)); - myWrappedChannel = theChannel; - } - - public SubscribableChannel getWrappedChannel() { - return myWrappedChannel; - } - - @Override - protected boolean sendInternal(Message theMessage, long timeout) { - for (MessageHandler next : getSubscribers()) { - next.handleMessage(theMessage); - } - return true; - } - - @Override - public void destroy() throws Exception { - if (myWrappedChannel instanceof DisposableBean) { - ((DisposableBean) myWrappedChannel).destroy(); - } - } - - @Override - public void addInterceptor(ChannelInterceptor interceptor) { - super.addInterceptor(interceptor); - myWrappedChannel.addInterceptor(interceptor); - } - - - @Override - public String getName() { - return myWrappedChannel.getName(); - } - } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/MatchingQueueSubscriberLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/MatchingQueueSubscriberLoader.java index 8c3d63a1841..22ce2310775 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/MatchingQueueSubscriberLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/MatchingQueueSubscriberLoader.java @@ -1,8 +1,10 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber; +import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; @@ -44,7 +46,7 @@ public class MatchingQueueSubscriberLoader { @Autowired private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber; - protected SubscribableChannel myMatchingChannel; + protected IChannelReceiver myMatchingChannel; @EventListener(classes = {ContextRefreshedEvent.class}) public void handleContextRefreshEvent() { @@ -61,8 +63,10 @@ public class MatchingQueueSubscriberLoader { @SuppressWarnings("unused") @PreDestroy - public void stop() { + public void stop() throws Exception { if (myMatchingChannel != null) { + ourLog.info("Destroying matching Channel {} with name {}", myMatchingChannel.getClass().getName(), SUBSCRIPTION_MATCHING_CHANNEL_NAME); + myMatchingChannel.destroy(); myMatchingChannel.unsubscribe(mySubscriptionMatchingSubscriber); myMatchingChannel.unsubscribe(mySubscriptionActivatingSubscriber); myMatchingChannel.unsubscribe(mySubscriptionRegisteringSubscriber); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/BroadcastingSubscribableChannelWrapperTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/BroadcastingSubscribableChannelWrapperTest.java new file mode 100644 index 00000000000..4c29d2b86ad --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/BroadcastingSubscribableChannelWrapperTest.java @@ -0,0 +1,44 @@ +package ca.uhn.fhir.jpa.subscription.channel.subscription; + +import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; +import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; +import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.messaging.MessageDeliveryException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class BroadcastingSubscribableChannelWrapperTest { + + @Mock + private IChannelReceiver myReceiver; + + @Test + public void testFailIfNoSubscribers() { + BroadcastingSubscribableChannelWrapper svc = new BroadcastingSubscribableChannelWrapper(myReceiver); + + try { + svc.send(new ResourceModifiedJsonMessage(new ResourceModifiedMessage())); + } catch (MessageDeliveryException e) { + assertThat(e.getMessage(), containsString("Channel has zero subscribers")); + } + } + + + @Test + public void testWrappedChannelDestroyed() throws Exception { + BroadcastingSubscribableChannelWrapper svc = new BroadcastingSubscribableChannelWrapper(myReceiver); + + svc.destroy(); + + verify(myReceiver, times(1)).destroy(); + } + +} diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java index c9354c26802..aaa6e1c0276 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java @@ -95,6 +95,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base private static SubscribableChannel ourSubscribableChannel; protected final PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED); protected final PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); + protected final PointcutLatch mySubscriptionAfterDelivery = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_DELIVERY); @BeforeEach public void beforeReset() { @@ -111,6 +112,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base ourSubscribableChannel.subscribe(subscriptionRegisteringSubscriber); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost); + myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, mySubscriptionAfterDelivery); } @AfterEach diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java index 92e2bf58511..71af344a8fb 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java @@ -73,9 +73,11 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri assertEquals(2, mySubscriptionRegistry.size()); + mySubscriptionAfterDelivery.setExpectedCount(1); ourObservationListener.setExpectedCount(0); sendObservation(code, "SNOMED-CT"); ourObservationListener.clear(); + mySubscriptionAfterDelivery.awaitExpected(); assertEquals(0, ourContentTypes.size()); } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java index 02874aeb5dc..c8ee403aab2 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java @@ -69,9 +69,11 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri assertEquals(2, mySubscriptionRegistry.size()); + mySubscriptionAfterDelivery.setExpectedCount(1); ourObservationListener.setExpectedCount(0); sendObservation(code, "SNOMED-CT"); ourObservationListener.clear(); + mySubscriptionAfterDelivery.awaitExpected(); assertEquals(0, ourContentTypes.size()); }