Fixes to interceptor in order to prevent zombie listeners on restart (#1988)

* Fixes to interceptor in order to prevent zombie listeners on restart

* Add test

* Add license header

* Test fixes
This commit is contained in:
James Agnew 2020-07-20 14:03:46 -04:00 committed by GitHub
parent 5db90d3e6e
commit 05735900fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 138 additions and 49 deletions

View File

@ -20,9 +20,10 @@ package ca.uhn.fhir.jpa.subscription.channel.api;
* #L%
*/
import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.InterceptableChannel;
public interface IChannelReceiver extends SubscribableChannel, InterceptableChannel {
public interface IChannelReceiver extends SubscribableChannel, InterceptableChannel, DisposableBean {
String getName();
}

View File

@ -52,4 +52,9 @@ public class LinkedBlockingChannel extends ExecutorSubscribableChannel implement
public String getName() {
return myName;
}
@Override
public void destroy() {
// nothing
}
}

View File

@ -0,0 +1,75 @@
package ca.uhn.fhir.jpa.subscription.channel.subscription;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import org.apache.commons.lang3.Validate;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.AbstractSubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import java.util.Set;
public class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements IChannelReceiver {
private final IChannelReceiver myWrappedChannel;
private final MessageHandler myHandler;
public BroadcastingSubscribableChannelWrapper(IChannelReceiver theChannel) {
myHandler = message -> send(message);
theChannel.subscribe(myHandler);
myWrappedChannel = theChannel;
}
public SubscribableChannel getWrappedChannel() {
return myWrappedChannel;
}
@Override
protected boolean sendInternal(Message<?> theMessage, long timeout) {
Set<MessageHandler> subscribers = getSubscribers();
Validate.isTrue(subscribers.size() > 0, "Channel has zero subscribers");
for (MessageHandler next : subscribers) {
next.handleMessage(theMessage);
}
return true;
}
@Override
public void destroy() throws Exception {
myWrappedChannel.destroy();
myWrappedChannel.unsubscribe(myHandler);
}
@Override
public void addInterceptor(ChannelInterceptor interceptor) {
super.addInterceptor(interceptor);
myWrappedChannel.addInterceptor(interceptor);
}
@Override
public String getName() {
return myWrappedChannel.getName();
}
}

View File

@ -29,12 +29,6 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import org.apache.commons.lang3.Validate;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.AbstractSubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
public class SubscriptionChannelFactory {
private final IChannelFactory myChannelFactory;
@ -105,44 +99,4 @@ public class SubscriptionChannelFactory {
return myChannelFactory;
}
public static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements IChannelReceiver, DisposableBean {
private final IChannelReceiver myWrappedChannel;
public BroadcastingSubscribableChannelWrapper(IChannelReceiver theChannel) {
theChannel.subscribe(message -> send(message));
myWrappedChannel = theChannel;
}
public SubscribableChannel getWrappedChannel() {
return myWrappedChannel;
}
@Override
protected boolean sendInternal(Message<?> theMessage, long timeout) {
for (MessageHandler next : getSubscribers()) {
next.handleMessage(theMessage);
}
return true;
}
@Override
public void destroy() throws Exception {
if (myWrappedChannel instanceof DisposableBean) {
((DisposableBean) myWrappedChannel).destroy();
}
}
@Override
public void addInterceptor(ChannelInterceptor interceptor) {
super.addInterceptor(interceptor);
myWrappedChannel.addInterceptor(interceptor);
}
@Override
public String getName() {
return myWrappedChannel.getName();
}
}
}

View File

@ -1,8 +1,10 @@
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
@ -44,7 +46,7 @@ public class MatchingQueueSubscriberLoader {
@Autowired
private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
protected SubscribableChannel myMatchingChannel;
protected IChannelReceiver myMatchingChannel;
@EventListener(classes = {ContextRefreshedEvent.class})
public void handleContextRefreshEvent() {
@ -61,8 +63,10 @@ public class MatchingQueueSubscriberLoader {
@SuppressWarnings("unused")
@PreDestroy
public void stop() {
public void stop() throws Exception {
if (myMatchingChannel != null) {
ourLog.info("Destroying matching Channel {} with name {}", myMatchingChannel.getClass().getName(), SUBSCRIPTION_MATCHING_CHANNEL_NAME);
myMatchingChannel.destroy();
myMatchingChannel.unsubscribe(mySubscriptionMatchingSubscriber);
myMatchingChannel.unsubscribe(mySubscriptionActivatingSubscriber);
myMatchingChannel.unsubscribe(mySubscriptionRegisteringSubscriber);

View File

@ -0,0 +1,44 @@
package ca.uhn.fhir.jpa.subscription.channel.subscription;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.messaging.MessageDeliveryException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
class BroadcastingSubscribableChannelWrapperTest {
@Mock
private IChannelReceiver myReceiver;
@Test
public void testFailIfNoSubscribers() {
BroadcastingSubscribableChannelWrapper svc = new BroadcastingSubscribableChannelWrapper(myReceiver);
try {
svc.send(new ResourceModifiedJsonMessage(new ResourceModifiedMessage()));
} catch (MessageDeliveryException e) {
assertThat(e.getMessage(), containsString("Channel has zero subscribers"));
}
}
@Test
public void testWrappedChannelDestroyed() throws Exception {
BroadcastingSubscribableChannelWrapper svc = new BroadcastingSubscribableChannelWrapper(myReceiver);
svc.destroy();
verify(myReceiver, times(1)).destroy();
}
}

View File

@ -95,6 +95,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
private static SubscribableChannel ourSubscribableChannel;
protected final PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED);
protected final PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
protected final PointcutLatch mySubscriptionAfterDelivery = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_DELIVERY);
@BeforeEach
public void beforeReset() {
@ -111,6 +112,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
ourSubscribableChannel.subscribe(subscriptionRegisteringSubscriber);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, mySubscriptionAfterDelivery);
}
@AfterEach

View File

@ -73,9 +73,11 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
assertEquals(2, mySubscriptionRegistry.size());
mySubscriptionAfterDelivery.setExpectedCount(1);
ourObservationListener.setExpectedCount(0);
sendObservation(code, "SNOMED-CT");
ourObservationListener.clear();
mySubscriptionAfterDelivery.awaitExpected();
assertEquals(0, ourContentTypes.size());
}

View File

@ -69,9 +69,11 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
assertEquals(2, mySubscriptionRegistry.size());
mySubscriptionAfterDelivery.setExpectedCount(1);
ourObservationListener.setExpectedCount(0);
sendObservation(code, "SNOMED-CT");
ourObservationListener.clear();
mySubscriptionAfterDelivery.awaitExpected();
assertEquals(0, ourContentTypes.size());
}