Extend SubscribableChannel with SubscribableChannelManagement so we can get a handler count so we know when to destroy a channel
This commit is contained in:
parent
a3a7174c63
commit
392d7ab000
|
@ -4,6 +4,7 @@ import ca.uhn.fhir.context.FhirContext;
|
|||
import ca.uhn.fhir.interceptor.api.*;
|
||||
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
|
||||
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
|
||||
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannel;
|
||||
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
|
||||
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
|
||||
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
|
||||
|
@ -16,7 +17,6 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
@ -48,7 +48,7 @@ import javax.annotation.PreDestroy;
|
|||
@Interceptor()
|
||||
public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer {
|
||||
public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
|
||||
protected SubscribableChannel myMatchingChannel;
|
||||
protected ISubscribableChannel myMatchingChannel;
|
||||
@Autowired
|
||||
protected SubscriptionChannelFactory mySubscriptionChannelFactory;
|
||||
private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);
|
||||
|
|
|
@ -33,6 +33,10 @@
|
|||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-messaging</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-context</artifactId>
|
||||
|
|
|
@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.subscription.module;
|
|||
* #L%
|
||||
*/
|
||||
|
||||
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannel;
|
||||
import ca.uhn.fhir.util.StopWatch;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
|
@ -27,14 +28,13 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.ChannelInterceptor;
|
||||
import org.springframework.messaging.support.ExecutorSubscribableChannel;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
public class LinkedBlockingQueueSubscribableChannel implements SubscribableChannel {
|
||||
public class LinkedBlockingQueueSubscribableChannel implements ISubscribableChannel {
|
||||
private Logger ourLog = LoggerFactory.getLogger(LinkedBlockingQueueSubscribableChannel.class);
|
||||
|
||||
private final ExecutorSubscribableChannel mySubscribableChannel;
|
||||
|
@ -100,4 +100,9 @@ public class LinkedBlockingQueueSubscribableChannel implements SubscribableChann
|
|||
public int getQueueSizeForUnitTest() {
|
||||
return myQueue.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSubscriberCount() {
|
||||
return mySubscribableChannel.getSubscribers().size();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Collection;
|
||||
|
@ -38,11 +37,11 @@ public class ActiveSubscription implements Closeable {
|
|||
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
|
||||
|
||||
private CanonicalSubscription mySubscription;
|
||||
private final SubscribableChannel mySubscribableChannel;
|
||||
private final ISubscribableChannel mySubscribableChannel;
|
||||
private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>();
|
||||
private boolean flagForDeletion;
|
||||
|
||||
public ActiveSubscription(CanonicalSubscription theSubscription, SubscribableChannel theSubscribableChannel) {
|
||||
public ActiveSubscription(CanonicalSubscription theSubscription, ISubscribableChannel theSubscribableChannel) {
|
||||
mySubscription = theSubscription;
|
||||
mySubscribableChannel = theSubscribableChannel;
|
||||
}
|
||||
|
@ -51,7 +50,7 @@ public class ActiveSubscription implements Closeable {
|
|||
return mySubscription;
|
||||
}
|
||||
|
||||
public SubscribableChannel getSubscribableChannel() {
|
||||
public ISubscribableChannel getSubscribableChannel() {
|
||||
return mySubscribableChannel;
|
||||
}
|
||||
|
||||
|
@ -97,13 +96,23 @@ public class ActiveSubscription implements Closeable {
|
|||
unregister(messageHandler);
|
||||
}
|
||||
if (mySubscribableChannel instanceof DisposableBean) {
|
||||
int subscriberCount = mySubscribableChannel.getSubscriberCount();
|
||||
if (subscriberCount > 0) {
|
||||
ourLog.info("Channel for subscription {} still has {} subscribers. Not destroying.", mySubscription.getIdPart(), subscriberCount);
|
||||
} else {
|
||||
ourLog.info("Channel for subscription {} has no subscribers. Destroying channel.", mySubscription.getIdPart());
|
||||
tryDestroyChannel((DisposableBean) mySubscribableChannel);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void tryDestroyChannel(DisposableBean theSubscribableChannel) {
|
||||
try {
|
||||
((DisposableBean) mySubscribableChannel).destroy();
|
||||
theSubscribableChannel.destroy();
|
||||
} catch (Exception e) {
|
||||
ourLog.error("Failed to destroy channel bean", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Use close() instead
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
package ca.uhn.fhir.jpa.subscription.module.cache;
|
||||
|
||||
import org.springframework.integration.support.management.SubscribableChannelManagement;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
public interface ISubscribableChannel extends SubscribableChannel, SubscribableChannelManagement {
|
||||
}
|
|
@ -20,10 +20,8 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
|
|||
* #L%
|
||||
*/
|
||||
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
public interface ISubscribableChannelFactory {
|
||||
SubscribableChannel createSubscribableChannel(String theChannelName, int theConcurrentConsumers);
|
||||
ISubscribableChannel createSubscribableChannel(String theChannelName, int theConcurrentConsumers);
|
||||
|
||||
int getDeliveryChannelConcurrentConsumers();
|
||||
|
||||
|
|
|
@ -21,13 +21,12 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
|
|||
*/
|
||||
|
||||
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
public class LinkedBlockingQueueSubscribableChannelFactory implements ISubscribableChannelFactory {
|
||||
@Override
|
||||
public SubscribableChannel createSubscribableChannel(String theChannelName, int theConcurrentConsumers) {
|
||||
public ISubscribableChannel createSubscribableChannel(String theChannelName, int theConcurrentConsumers) {
|
||||
return new LinkedBlockingQueueSubscribableChannel(new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE), theChannelName + "-%d", theConcurrentConsumers);
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,6 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
|
|||
|
||||
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
|
@ -38,12 +37,12 @@ public class SubscriptionChannelFactory {
|
|||
mySubscribableChannelFactory = theSubscribableChannelFactory;
|
||||
}
|
||||
|
||||
public SubscribableChannel newDeliveryChannel(CanonicalSubscription theCanonicalSubscription) {
|
||||
public ISubscribableChannel newDeliveryChannel(CanonicalSubscription theCanonicalSubscription) {
|
||||
String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(theCanonicalSubscription);
|
||||
return mySubscribableChannelFactory.createSubscribableChannel(channelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers());
|
||||
}
|
||||
|
||||
public SubscribableChannel newMatchingChannel(String theChannelName) {
|
||||
public ISubscribableChannel newMatchingChannel(String theChannelName) {
|
||||
return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,9 +21,9 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
|
|||
*/
|
||||
|
||||
import ca.uhn.fhir.interceptor.api.HookParams;
|
||||
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
|
||||
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
|
||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
|
||||
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
|
@ -31,7 +31,6 @@ import org.hl7.fhir.instance.model.api.IIdType;
|
|||
import org.hl7.fhir.r4.model.Subscription;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
|
@ -91,7 +90,7 @@ public class SubscriptionRegistry {
|
|||
Validate.notNull(theSubscription);
|
||||
|
||||
CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
|
||||
SubscribableChannel deliveryChannel;
|
||||
ISubscribableChannel deliveryChannel;
|
||||
Optional<MessageHandler> deliveryHandler;
|
||||
|
||||
if (myModelConfig.isSubscriptionMatchingEnabled()) {
|
||||
|
|
|
@ -10,6 +10,7 @@ import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
|
|||
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
|
||||
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
|
||||
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
|
||||
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannel;
|
||||
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
|
||||
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader;
|
||||
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
|
||||
|
@ -39,7 +40,6 @@ import org.junit.BeforeClass;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import java.util.ArrayList;
|
||||
|
@ -74,7 +74,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
|
|||
protected static final List<Observation> ourCreatedObservations = Collections.synchronizedList(Lists.newArrayList());
|
||||
protected static final List<Observation> ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList());
|
||||
protected static final List<String> ourContentTypes = Collections.synchronizedList(new ArrayList<>());
|
||||
private static SubscribableChannel ourSubscribableChannel;
|
||||
private static ISubscribableChannel ourSubscribableChannel;
|
||||
protected final PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED);
|
||||
protected final PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
|
||||
|
||||
|
|
6
pom.xml
6
pom.xml
|
@ -617,6 +617,7 @@
|
|||
<servicemix_saxon_version>9.5.1-5_1</servicemix_saxon_version>
|
||||
<servicemix_xmlresolver_version>1.2_5</servicemix_xmlresolver_version>
|
||||
<slf4j_version>1.7.25</slf4j_version>
|
||||
<spring_integration_version>5.1.7.RELEASE</spring_integration_version>
|
||||
<spring_version>5.1.8.RELEASE</spring_version>
|
||||
<!-- FYI: Spring Data JPA 2.1.9 causes test failures due to unexpected cascading deletes -->
|
||||
<spring_data_version>2.1.8.RELEASE</spring_data_version>
|
||||
|
@ -1345,6 +1346,11 @@
|
|||
<artifactId>spring-messaging</artifactId>
|
||||
<version>${spring_version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-core</artifactId>
|
||||
<version>${spring_integration_version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-orm</artifactId>
|
||||
|
|
Loading…
Reference in New Issue