Remove ISubscribableChannel interface. We don't need it after all.

This commit is contained in:
Ken Stevens 2019-10-02 06:35:39 -04:00
parent ed195f8dab
commit 5aa4f88da3
12 changed files with 20 additions and 38 deletions

View File

@ -4,7 +4,6 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.*; import ca.uhn.fhir.interceptor.api.*;
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
@ -17,6 +16,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionSynchronizationManager;
@ -48,7 +48,7 @@ import javax.annotation.PreDestroy;
@Interceptor() @Interceptor()
public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer { public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer {
public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching"; public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
protected ISubscribableChannel myMatchingChannel; protected SubscribableChannel myMatchingChannel;
@Autowired @Autowired
protected SubscriptionChannelFactory mySubscriptionChannelFactory; protected SubscriptionChannelFactory mySubscriptionChannelFactory;
private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class); private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);

View File

@ -34,10 +34,6 @@
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId> <artifactId>spring-messaging</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId> <artifactId>spring-context</artifactId>

View File

@ -20,7 +20,6 @@ package ca.uhn.fhir.jpa.subscription.module;
* #L% * #L%
*/ */
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel;
import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.StopWatch;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.concurrent.BasicThreadFactory;
@ -28,13 +27,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ExecutorSubscribableChannel; import org.springframework.messaging.support.ExecutorSubscribableChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.*; import java.util.concurrent.*;
public class LinkedBlockingQueueSubscribableChannel implements ISubscribableChannel { public class LinkedBlockingQueueSubscribableChannel implements SubscribableChannel {
private Logger ourLog = LoggerFactory.getLogger(LinkedBlockingQueueSubscribableChannel.class); private Logger ourLog = LoggerFactory.getLogger(LinkedBlockingQueueSubscribableChannel.class);
private final ExecutorSubscribableChannel mySubscribableChannel; private final ExecutorSubscribableChannel mySubscribableChannel;
@ -100,9 +100,4 @@ public class LinkedBlockingQueueSubscribableChannel implements ISubscribableChan
public int getQueueSizeForUnitTest() { public int getQueueSizeForUnitTest() {
return myQueue.size(); return myQueue.size();
} }
@Override
public int getSubscriberCount() {
return mySubscribableChannel.getSubscribers().size();
}
} }

View File

@ -21,14 +21,14 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
*/ */
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel; import org.springframework.messaging.SubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueSubscribableChannelFactory implements ISubscribableChannelFactory { public class LinkedBlockingQueueSubscribableChannelFactory implements ISubscribableChannelFactory {
@Override @Override
public ISubscribableChannel createSubscribableChannel(String theChannelName, int theConcurrentConsumers) { public SubscribableChannel createSubscribableChannel(String theChannelName, int theConcurrentConsumers) {
return new LinkedBlockingQueueSubscribableChannel(new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE), theChannelName + "-%d", theConcurrentConsumers); return new LinkedBlockingQueueSubscribableChannel(new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE), theChannelName + "-%d", theConcurrentConsumers);
} }

View File

@ -1,7 +0,0 @@
package ca.uhn.fhir.jpa.subscription.module.channel;
import org.springframework.integration.support.management.SubscribableChannelManagement;
import org.springframework.messaging.SubscribableChannel;
public interface ISubscribableChannel extends SubscribableChannel, SubscribableChannelManagement {
}

View File

@ -20,8 +20,10 @@ package ca.uhn.fhir.jpa.subscription.module.channel;
* #L% * #L%
*/ */
import org.springframework.messaging.SubscribableChannel;
public interface ISubscribableChannelFactory { public interface ISubscribableChannelFactory {
ISubscribableChannel createSubscribableChannel(String theChannelName, int theConcurrentConsumers); SubscribableChannel createSubscribableChannel(String theChannelName, int theConcurrentConsumers);
int getDeliveryChannelConcurrentConsumers(); int getDeliveryChannelConcurrentConsumers();

View File

@ -21,7 +21,7 @@ package ca.uhn.fhir.jpa.subscription.module.channel;
*/ */
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel; import org.springframework.messaging.SubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer; import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -37,11 +37,11 @@ public class SubscriptionChannelFactory {
mySubscribableChannelFactory = theSubscribableChannelFactory; mySubscribableChannelFactory = theSubscribableChannelFactory;
} }
public ISubscribableChannel newDeliveryChannel(String theChannelName) { public SubscribableChannel newDeliveryChannel(String theChannelName) {
return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers()); return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers());
} }
public ISubscribableChannel newMatchingChannel(String theChannelName) { public SubscribableChannel newMatchingChannel(String theChannelName) {
return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers()); return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers());
} }
} }

View File

@ -10,6 +10,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collection; import java.util.Collection;
@ -44,7 +45,7 @@ public class SubscriptionChannelRegistry {
return; return;
} }
ISubscribableChannel deliveryChannel; SubscribableChannel deliveryChannel;
Optional<MessageHandler> deliveryHandler; Optional<MessageHandler> deliveryHandler;
deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName); deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName);

View File

@ -7,6 +7,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import java.io.Closeable; import java.io.Closeable;
import java.util.Collection; import java.util.Collection;
@ -16,10 +17,10 @@ public class SubscriptionChannelWithHandlers implements Closeable {
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class); private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
private final String myChannelName; private final String myChannelName;
private final ISubscribableChannel mySubscribableChannel; private final SubscribableChannel mySubscribableChannel;
private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>(); private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>();
public SubscriptionChannelWithHandlers(String theChannelName, ISubscribableChannel theSubscribableChannel) { public SubscriptionChannelWithHandlers(String theChannelName, SubscribableChannel theSubscribableChannel) {
myChannelName = theChannelName; myChannelName = theChannelName;
mySubscribableChannel = theSubscribableChannel; mySubscribableChannel = theSubscribableChannel;
} }

View File

@ -10,7 +10,6 @@ import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer; import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader;
@ -41,6 +40,7 @@ import org.junit.BeforeClass;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.SubscribableChannel;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList; import java.util.ArrayList;
@ -77,7 +77,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
protected static final List<Observation> ourCreatedObservations = Collections.synchronizedList(Lists.newArrayList()); protected static final List<Observation> ourCreatedObservations = Collections.synchronizedList(Lists.newArrayList());
protected static final List<Observation> ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList()); protected static final List<Observation> ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList());
protected static final List<String> ourContentTypes = Collections.synchronizedList(new ArrayList<>()); protected static final List<String> ourContentTypes = Collections.synchronizedList(new ArrayList<>());
private static ISubscribableChannel ourSubscribableChannel; private static SubscribableChannel ourSubscribableChannel;
protected final PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED); protected final PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED);
protected final PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); protected final PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);

View File

@ -16,7 +16,7 @@ import java.util.Collections;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
public class SearchParamLoaderTest extends BaseBlockingQueueSubscribableChannelDstu3Test { public class SearchParamLoaderTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
private static final int MOCK_FHIR_CLIENT_FAILURES = 5; private static final int MOCK_FHIR_CLIENT_FAILURES = 3;
@Autowired @Autowired
private MockFhirClientSearchParamProvider myMockFhirClientSearchParamProvider; private MockFhirClientSearchParamProvider myMockFhirClientSearchParamProvider;
@Autowired @Autowired

View File

@ -624,7 +624,6 @@
<servicemix_saxon_version>9.5.1-5_1</servicemix_saxon_version> <servicemix_saxon_version>9.5.1-5_1</servicemix_saxon_version>
<servicemix_xmlresolver_version>1.2_5</servicemix_xmlresolver_version> <servicemix_xmlresolver_version>1.2_5</servicemix_xmlresolver_version>
<slf4j_version>1.7.25</slf4j_version> <slf4j_version>1.7.25</slf4j_version>
<spring_integration_version>5.1.7.RELEASE</spring_integration_version>
<spring_version>5.1.8.RELEASE</spring_version> <spring_version>5.1.8.RELEASE</spring_version>
<!-- FYI: Spring Data JPA 2.1.9 causes test failures due to unexpected cascading deletes --> <!-- FYI: Spring Data JPA 2.1.9 causes test failures due to unexpected cascading deletes -->
<spring_data_version>2.1.8.RELEASE</spring_data_version> <spring_data_version>2.1.8.RELEASE</spring_data_version>
@ -1341,11 +1340,6 @@
<artifactId>spring-messaging</artifactId> <artifactId>spring-messaging</artifactId>
<version>${spring_version}</version> <version>${spring_version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>${spring_integration_version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>
<artifactId>spring-orm</artifactId> <artifactId>spring-orm</artifactId>