all tests pass
This commit is contained in:
parent
a584e15251
commit
19afcb7e09
|
@ -8,6 +8,8 @@ import ca.uhn.fhir.jpa.search.IStaleSearchDeletingSvc;
|
|||
import ca.uhn.fhir.jpa.search.StaleSearchDeletingSvcImpl;
|
||||
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
|
||||
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl;
|
||||
import ca.uhn.fhir.jpa.subscription.ISubscriptionTriggeringSvc;
|
||||
import ca.uhn.fhir.jpa.subscription.SubscriptionTriggeringSvcImpl;
|
||||
import ca.uhn.fhir.jpa.subscription.dbmatcher.CompositeInMemoryDaoSubscriptionMatcher;
|
||||
import ca.uhn.fhir.jpa.subscription.dbmatcher.DaoSubscriptionMatcher;
|
||||
import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannelFactory;
|
||||
|
|
|
@ -143,7 +143,6 @@ public class DaoConfig {
|
|||
private boolean myDisableHashBasedSearches;
|
||||
private boolean myEnableInMemorySubscriptionMatching = true;
|
||||
private ClientIdStrategyEnum myResourceClientIdStrategy = ClientIdStrategyEnum.ALPHANUMERIC;
|
||||
private boolean mySubscriptionMatchingEnabled = true;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
|
@ -530,7 +529,7 @@ public class DaoConfig {
|
|||
* This may be used to optionally register server interceptors directly against the DAOs.
|
||||
*/
|
||||
public void setInterceptors(IServerInterceptor... theInterceptor) {
|
||||
setInterceptors(new ArrayList<IServerInterceptor>());
|
||||
setInterceptors(new ArrayList<>());
|
||||
if (theInterceptor != null && theInterceptor.length != 0) {
|
||||
getInterceptors().addAll(Arrays.asList(theInterceptor));
|
||||
}
|
||||
|
@ -1308,8 +1307,7 @@ public class DaoConfig {
|
|||
public void setSearchPreFetchThresholds(List<Integer> thePreFetchThresholds) {
|
||||
Validate.isTrue(thePreFetchThresholds.size() > 0, "thePreFetchThresholds must not be empty");
|
||||
int last = 0;
|
||||
for (Integer nextInteger : thePreFetchThresholds) {
|
||||
int nextInt = nextInteger.intValue();
|
||||
for (Integer nextInt : thePreFetchThresholds) {
|
||||
Validate.isTrue(nextInt > 0 || nextInt == -1, nextInt + " is not a valid prefetch threshold");
|
||||
Validate.isTrue(nextInt != last, "Prefetch thresholds must be sequential");
|
||||
Validate.isTrue(nextInt > last || nextInt == -1, "Prefetch thresholds must be sequential");
|
||||
|
@ -1398,7 +1396,7 @@ public class DaoConfig {
|
|||
*/
|
||||
|
||||
public boolean isSubscriptionMatchingEnabled() {
|
||||
return mySubscriptionMatchingEnabled;
|
||||
return myModelConfig.isSubscriptionMatchingEnabled();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1407,9 +1405,8 @@ public class DaoConfig {
|
|||
* @since 3.7.0
|
||||
*/
|
||||
|
||||
|
||||
public void setSubscriptionMatchingEnabled(boolean theSubscriptionMatchingEnabled) {
|
||||
mySubscriptionMatchingEnabled = theSubscriptionMatchingEnabled;
|
||||
myModelConfig.setSubscriptionMatchingEnabled(theSubscriptionMatchingEnabled);
|
||||
}
|
||||
|
||||
public ModelConfig getModelConfig() {
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
package ca.uhn.fhir.jpa.subscription;
|
||||
|
||||
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
|
||||
|
||||
public interface IResourceModifiedConsumer {
|
||||
void submitResourceModified(ResourceModifiedMessage theMsg);
|
||||
}
|
|
@ -37,50 +37,41 @@ import java.util.Set;
|
|||
public class SubscriptionInterceptorLoader {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionInterceptorLoader.class);
|
||||
|
||||
// TODO KHS these beans are late loaded because we don't want to run their @PostConstruct and @Scheduled method if they're
|
||||
// not required. Recommend removing @PostConstruct from these classes and instead call those methods in register interceptors below.
|
||||
// @Schedule will be tricker to resolve
|
||||
|
||||
@Autowired
|
||||
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
|
||||
@Autowired
|
||||
private SubscriptionActivatingInterceptor mySubscriptionActivatingInterceptor;
|
||||
|
||||
@Autowired
|
||||
DaoConfig myDaoConfig;
|
||||
@Autowired
|
||||
private ApplicationContext myAppicationContext;
|
||||
@Autowired
|
||||
private SubscriptionRegistry mySubscriptionRegistry;
|
||||
@Autowired
|
||||
private ApplicationContext myAppicationContext;
|
||||
|
||||
public void registerInterceptors() {
|
||||
Set<Subscription.SubscriptionChannelType> supportedSubscriptionTypes = myDaoConfig.getSupportedSubscriptionTypes();
|
||||
|
||||
if (!supportedSubscriptionTypes.isEmpty()) {
|
||||
loadSubscriptions();
|
||||
if (mySubscriptionActivatingInterceptor == null) {
|
||||
mySubscriptionActivatingInterceptor = myAppicationContext.getBean(SubscriptionActivatingInterceptor.class);
|
||||
}
|
||||
ourLog.info("Registering subscription activating interceptor");
|
||||
myDaoConfig.registerInterceptor(mySubscriptionActivatingInterceptor);
|
||||
}
|
||||
if (myDaoConfig.isSubscriptionMatchingEnabled()) {
|
||||
if (mySubscriptionMatcherInterceptor == null) {
|
||||
mySubscriptionMatcherInterceptor = myAppicationContext.getBean(SubscriptionMatcherInterceptor.class);
|
||||
}
|
||||
mySubscriptionMatcherInterceptor.start();
|
||||
ourLog.info("Registering subscription matcher interceptor");
|
||||
myDaoConfig.registerInterceptor(mySubscriptionMatcherInterceptor);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private void loadSubscriptions() {
|
||||
ourLog.info("Loading subscriptions into the SubscriptionRegistry...");
|
||||
// Load subscriptions into the SubscriptionRegistry
|
||||
// Activate scheduled subscription loads into the SubscriptionRegistry
|
||||
myAppicationContext.getBean(SubscriptionLoader.class);
|
||||
ourLog.info("...{} subscriptions loaded", mySubscriptionRegistry.size());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void unregisterInterceptorsForUnitTest() {
|
||||
void unregisterInterceptorsForUnitTest() {
|
||||
myDaoConfig.unregisterInterceptor(mySubscriptionActivatingInterceptor);
|
||||
myDaoConfig.unregisterInterceptor(mySubscriptionMatcherInterceptor);
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscr
|
|||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -42,12 +43,12 @@ import javax.annotation.PreDestroy;
|
|||
|
||||
@Component
|
||||
@Lazy
|
||||
public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAdapter {
|
||||
public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAdapter implements IResourceModifiedConsumer {
|
||||
private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);
|
||||
|
||||
public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
|
||||
public static final String SUBSCRIPTION_STATUS = "Subscription.status";
|
||||
public static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
|
||||
private static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
|
||||
static final String SUBSCRIPTION_STATUS = "Subscription.status";
|
||||
static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
|
||||
private SubscribableChannel myProcessingChannel;
|
||||
|
||||
@Autowired
|
||||
|
@ -64,7 +65,6 @@ public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAd
|
|||
super();
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
if (myProcessingChannel == null) {
|
||||
myProcessingChannel = mySubscriptionChannelFactory.newMatchingChannel(SUBSCRIPTION_MATCHING_CHANNEL_NAME);
|
||||
|
@ -77,7 +77,10 @@ public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAd
|
|||
@SuppressWarnings("unused")
|
||||
@PreDestroy
|
||||
public void preDestroy() {
|
||||
myProcessingChannel.unsubscribe(mySubscriptionMatchingSubscriber);
|
||||
|
||||
if (myProcessingChannel != null) {
|
||||
myProcessingChannel.unsubscribe(mySubscriptionMatchingSubscriber);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -100,8 +103,9 @@ public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAd
|
|||
submitResourceModified(msg);
|
||||
}
|
||||
|
||||
protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
|
||||
private void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
|
||||
ourLog.trace("Sending resource modified message to processing channel");
|
||||
Validate.notNull(myProcessingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
|
||||
myProcessingChannel.send(new ResourceModifiedJsonMessage(theMessage));
|
||||
}
|
||||
|
||||
|
@ -117,7 +121,7 @@ public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAd
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public LinkedBlockingQueueSubscribableChannel getProcessingChannelForUnitTest() {
|
||||
LinkedBlockingQueueSubscribableChannel getProcessingChannelForUnitTest() {
|
||||
return (LinkedBlockingQueueSubscribableChannel) myProcessingChannel;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,10 +72,10 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
|
|||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
|
||||
@Service
|
||||
public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc, ApplicationContextAware {
|
||||
public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class);
|
||||
|
||||
public static final int DEFAULT_MAX_SUBMIT = 10000;
|
||||
private static final int DEFAULT_MAX_SUBMIT = 10000;
|
||||
|
||||
@Autowired
|
||||
private FhirContext myFhirContext;
|
||||
|
@ -88,11 +88,10 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
|
|||
@Autowired
|
||||
private MatchUrlService myMatchUrlService;
|
||||
@Autowired
|
||||
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
|
||||
private IResourceModifiedConsumer myResourceModifiedConsumer;
|
||||
|
||||
private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>();
|
||||
private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT;
|
||||
private ApplicationContext myAppCtx;
|
||||
private ExecutorService myExecutorService;
|
||||
|
||||
@Override
|
||||
|
@ -105,7 +104,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
|
|||
if (theSubscriptionId != null) {
|
||||
IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao();
|
||||
IIdType subscriptionId = theSubscriptionId;
|
||||
if (subscriptionId.hasResourceType() == false) {
|
||||
if (!subscriptionId.hasResourceType()) {
|
||||
subscriptionId = subscriptionId.withResourceType(ResourceTypeEnum.SUBSCRIPTION.getCode());
|
||||
}
|
||||
subscriptionDao.read(subscriptionId);
|
||||
|
@ -128,7 +127,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
|
|||
|
||||
// Search URLs must be valid
|
||||
for (StringParam next : searchUrls) {
|
||||
if (next.getValue().contains("?") == false) {
|
||||
if (!next.getValue().contains("?")) {
|
||||
throw new InvalidRequestException("Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
|
||||
}
|
||||
}
|
||||
|
@ -163,7 +162,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
|
|||
return;
|
||||
}
|
||||
|
||||
String activeJobIds = myActiveJobs.stream().map(t -> t.getJobId()).collect(Collectors.joining(", "));
|
||||
String activeJobIds = myActiveJobs.stream().map(SubscriptionTriggeringJobDetails::getJobId).collect(Collectors.joining(", "));
|
||||
ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds);
|
||||
|
||||
SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);
|
||||
|
@ -290,7 +289,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
|
|||
|
||||
private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
|
||||
org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
|
||||
IFhirResourceDao<? extends IBaseResource> dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
|
||||
IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
|
||||
IBaseResource resourceToTrigger = dao.read(resourceId);
|
||||
|
||||
return submitResource(theSubscriptionId, resourceToTrigger);
|
||||
|
@ -306,7 +305,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
|
|||
return myExecutorService.submit(() -> {
|
||||
for (int i = 0; ; i++) {
|
||||
try {
|
||||
mySubscriptionMatcherInterceptor.submitResourceModified(msg);
|
||||
myResourceModifiedConsumer.submitResourceModified(msg);
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
if (i >= 3) {
|
||||
|
@ -329,11 +328,6 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
myAppCtx = applicationContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum number of resources that will be submitted in a single pass
|
||||
*/
|
||||
|
@ -346,7 +340,6 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
|
|||
myMaxSubmitPerPass = maxSubmitPerPass;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
|
||||
|
@ -393,67 +386,67 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
|
|||
private String myCurrentSearchResourceType;
|
||||
private int myCurrentSearchLastUploadedIndex;
|
||||
|
||||
public Integer getCurrentSearchCount() {
|
||||
Integer getCurrentSearchCount() {
|
||||
return myCurrentSearchCount;
|
||||
}
|
||||
|
||||
public void setCurrentSearchCount(Integer theCurrentSearchCount) {
|
||||
void setCurrentSearchCount(Integer theCurrentSearchCount) {
|
||||
myCurrentSearchCount = theCurrentSearchCount;
|
||||
}
|
||||
|
||||
public String getCurrentSearchResourceType() {
|
||||
String getCurrentSearchResourceType() {
|
||||
return myCurrentSearchResourceType;
|
||||
}
|
||||
|
||||
public void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
|
||||
void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
|
||||
myCurrentSearchResourceType = theCurrentSearchResourceType;
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
String getJobId() {
|
||||
return myJobId;
|
||||
}
|
||||
|
||||
public void setJobId(String theJobId) {
|
||||
void setJobId(String theJobId) {
|
||||
myJobId = theJobId;
|
||||
}
|
||||
|
||||
public String getSubscriptionId() {
|
||||
String getSubscriptionId() {
|
||||
return mySubscriptionId;
|
||||
}
|
||||
|
||||
public void setSubscriptionId(String theSubscriptionId) {
|
||||
void setSubscriptionId(String theSubscriptionId) {
|
||||
mySubscriptionId = theSubscriptionId;
|
||||
}
|
||||
|
||||
public List<String> getRemainingResourceIds() {
|
||||
List<String> getRemainingResourceIds() {
|
||||
return myRemainingResourceIds;
|
||||
}
|
||||
|
||||
public void setRemainingResourceIds(List<String> theRemainingResourceIds) {
|
||||
void setRemainingResourceIds(List<String> theRemainingResourceIds) {
|
||||
myRemainingResourceIds = theRemainingResourceIds;
|
||||
}
|
||||
|
||||
public List<String> getRemainingSearchUrls() {
|
||||
List<String> getRemainingSearchUrls() {
|
||||
return myRemainingSearchUrls;
|
||||
}
|
||||
|
||||
public void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
|
||||
void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
|
||||
myRemainingSearchUrls = theRemainingSearchUrls;
|
||||
}
|
||||
|
||||
public String getCurrentSearchUuid() {
|
||||
String getCurrentSearchUuid() {
|
||||
return myCurrentSearchUuid;
|
||||
}
|
||||
|
||||
public void setCurrentSearchUuid(String theCurrentSearchUuid) {
|
||||
void setCurrentSearchUuid(String theCurrentSearchUuid) {
|
||||
myCurrentSearchUuid = theCurrentSearchUuid;
|
||||
}
|
||||
|
||||
public int getCurrentSearchLastUploadedIndex() {
|
||||
int getCurrentSearchLastUploadedIndex() {
|
||||
return myCurrentSearchLastUploadedIndex;
|
||||
}
|
||||
|
||||
public void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
|
||||
void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
|
||||
myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,14 +59,14 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
|
|||
protected static RestfulServer ourRestServer;
|
||||
protected static String ourServerBase;
|
||||
protected static SearchParamRegistryR4 ourSearchParamRegistry;
|
||||
protected static DatabaseBackedPagingProvider ourPagingProvider;
|
||||
private static DatabaseBackedPagingProvider ourPagingProvider;
|
||||
protected static ISearchDao mySearchEntityDao;
|
||||
protected static ISearchCoordinatorSvc mySearchCoordinatorSvc;
|
||||
protected static GenericWebApplicationContext ourWebApplicationContext;
|
||||
protected static SubscriptionMatcherInterceptor ourSubscriptionMatcherInterceptor;
|
||||
private static GenericWebApplicationContext ourWebApplicationContext;
|
||||
private static SubscriptionMatcherInterceptor ourSubscriptionMatcherInterceptor;
|
||||
private static Server ourServer;
|
||||
protected IGenericClient ourClient;
|
||||
protected ResourceCountCache ourResourceCountsCache;
|
||||
ResourceCountCache ourResourceCountsCache;
|
||||
private TerminologyUploaderProviderR4 myTerminologyUploaderProvider;
|
||||
private Object ourGraphQLProvider;
|
||||
private boolean ourRestHookSubscriptionInterceptorRequested;
|
||||
|
@ -162,6 +162,7 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
|
|||
mySearchEntityDao = wac.getBean(ISearchDao.class);
|
||||
ourSearchParamRegistry = wac.getBean(SearchParamRegistryR4.class);
|
||||
ourSubscriptionMatcherInterceptor = wac.getBean(SubscriptionMatcherInterceptor.class);
|
||||
ourSubscriptionMatcherInterceptor.start();
|
||||
|
||||
myFhirCtx.getRestfulClientFactory().setSocketTimeout(5000000);
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ public class ModelConfig {
|
|||
* <li><code>"http://hl7.org/fhir/StructureDefinition/*"</code></li>
|
||||
* </ul>
|
||||
*/
|
||||
public static final Set<String> DEFAULT_LOGICAL_BASE_URLS = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(
|
||||
public static final Set<String> DEFAULT_LOGICAL_BASE_URLS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
"http://hl7.org/fhir/ValueSet/*",
|
||||
"http://hl7.org/fhir/CodeSystem/*",
|
||||
"http://hl7.org/fhir/valueset-*",
|
||||
|
@ -57,6 +57,7 @@ public class ModelConfig {
|
|||
private boolean myDefaultSearchParamsCanBeOverridden = false;
|
||||
private Set<Subscription.SubscriptionChannelType> mySupportedSubscriptionTypes = new HashSet<>();
|
||||
private String myEmailFromAddress = "noreply@unknown.com";
|
||||
private boolean mySubscriptionMatchingEnabled = true;
|
||||
|
||||
/**
|
||||
* If set to {@code true} the default search params (i.e. the search parameters that are
|
||||
|
@ -225,7 +226,7 @@ public class ModelConfig {
|
|||
}
|
||||
}
|
||||
|
||||
HashSet<String> treatBaseUrlsAsLocal = new HashSet<String>();
|
||||
HashSet<String> treatBaseUrlsAsLocal = new HashSet<>();
|
||||
for (String next : ObjectUtils.defaultIfNull(theTreatBaseUrlsAsLocal, new HashSet<String>())) {
|
||||
while (next.endsWith("/")) {
|
||||
next = next.substring(0, next.length() - 1);
|
||||
|
@ -320,6 +321,27 @@ public class ModelConfig {
|
|||
return Collections.unmodifiableSet(mySupportedSubscriptionTypes);
|
||||
}
|
||||
|
||||
/**
|
||||
* If set to <code>true</code> (default is true) the server will match incoming resources against active subscriptions
|
||||
* and send them to the subscription channel. If set to <code>false</code> no matching or sending occurs.
|
||||
* @since 3.7.0
|
||||
*/
|
||||
|
||||
public boolean isSubscriptionMatchingEnabled() {
|
||||
return mySubscriptionMatchingEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* If set to <code>true</code> (default is true) the server will match incoming resources against active subscriptions
|
||||
* and send them to the subscription channel. If set to <code>false</code> no matching or sending occurs.
|
||||
* @since 3.7.0
|
||||
*/
|
||||
|
||||
|
||||
public void setSubscriptionMatchingEnabled(boolean theSubscriptionMatchingEnabled) {
|
||||
mySubscriptionMatchingEnabled = theSubscriptionMatchingEnabled;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void clearSupportedSubscriptionTypesForUnitTest() {
|
||||
mySupportedSubscriptionTypes.clear();
|
||||
|
|
|
@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
|
|||
* #L%
|
||||
*/
|
||||
|
||||
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
|
||||
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
|
@ -42,16 +43,19 @@ import java.util.Optional;
|
|||
* handlers are all caches in this registry so they can be removed it the subscription is deleted.
|
||||
*/
|
||||
|
||||
// TODO KHS Does jpa need a subscription registry if matching is disabled?
|
||||
@Component
|
||||
public class SubscriptionRegistry {
|
||||
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionRegistry.class);
|
||||
|
||||
@Autowired
|
||||
SubscriptionCanonicalizer mySubscriptionCanonicalizer;
|
||||
SubscriptionCanonicalizer<IBaseResource> mySubscriptionCanonicalizer;
|
||||
@Autowired
|
||||
SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
|
||||
@Autowired
|
||||
SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
|
||||
@Autowired
|
||||
ModelConfig myModelConfig;
|
||||
|
||||
private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
|
||||
|
||||
|
@ -71,18 +75,26 @@ public class SubscriptionRegistry {
|
|||
}
|
||||
|
||||
@SuppressWarnings("UnusedReturnValue")
|
||||
public CanonicalSubscription registerSubscription(IIdType theId, IBaseResource theSubscription) {
|
||||
private CanonicalSubscription registerSubscription(IIdType theId, IBaseResource theSubscription) {
|
||||
Validate.notNull(theId);
|
||||
String subscriptionId = theId.getIdPart();
|
||||
Validate.notBlank(subscriptionId);
|
||||
Validate.notNull(theSubscription);
|
||||
|
||||
CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
|
||||
SubscribableChannel deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(subscriptionId, canonicalized.getChannelType().toCode().toLowerCase());
|
||||
Optional<MessageHandler> deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(canonicalized);
|
||||
SubscribableChannel deliveryChannel;
|
||||
Optional<MessageHandler> deliveryHandler;
|
||||
|
||||
if (myModelConfig.isSubscriptionMatchingEnabled()) {
|
||||
deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(subscriptionId, canonicalized.getChannelType().toCode().toLowerCase());
|
||||
deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(canonicalized);
|
||||
} else {
|
||||
deliveryChannel = null;
|
||||
deliveryHandler = Optional.empty();
|
||||
}
|
||||
|
||||
ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, deliveryChannel);
|
||||
deliveryHandler.ifPresent(handler -> activeSubscription.register(handler));
|
||||
deliveryHandler.ifPresent(activeSubscription::register);
|
||||
|
||||
myActiveSubscriptionCache.put(subscriptionId, activeSubscription);
|
||||
|
||||
|
|
|
@ -112,8 +112,6 @@ public class FhirAutoConfiguration {
|
|||
|
||||
private final IPagingProvider pagingProvider;
|
||||
|
||||
private final List<IServerInterceptor> interceptors;
|
||||
|
||||
private final List<FhirRestfulServerCustomizer> customizers;
|
||||
|
||||
public FhirRestfulServerConfiguration(
|
||||
|
@ -127,7 +125,6 @@ public class FhirAutoConfiguration {
|
|||
this.fhirContext = fhirContext;
|
||||
this.resourceProviders = resourceProviders.getIfAvailable();
|
||||
this.pagingProvider = pagingProvider.getIfAvailable();
|
||||
this.interceptors = interceptors.getIfAvailable();
|
||||
this.customizers = customizers.getIfAvailable();
|
||||
}
|
||||
|
||||
|
@ -154,7 +151,6 @@ public class FhirAutoConfiguration {
|
|||
setFhirContext(this.fhirContext);
|
||||
setResourceProviders(this.resourceProviders);
|
||||
setPagingProvider(this.pagingProvider);
|
||||
setInterceptors(this.interceptors);
|
||||
|
||||
setServerAddressStrategy(new HardcodedServerAddressStrategy(this.properties.getServer().getPath()));
|
||||
|
||||
|
|
Loading…
Reference in New Issue