Merge branch 'master' into test-openjdk-11

This commit is contained in:
James Agnew 2019-01-09 13:42:50 -05:00
commit f241457bc1
11 changed files with 205 additions and 46 deletions

View File

@ -143,6 +143,8 @@ public class DaoConfig {
private boolean myDisableHashBasedSearches;
private boolean myEnableInMemorySubscriptionMatching = true;
private ClientIdStrategyEnum myResourceClientIdStrategy = ClientIdStrategyEnum.ALPHANUMERIC;
private boolean mySubscriptionMatchingEnabled = true;
/**
* Constructor
*/
@ -1389,6 +1391,27 @@ public class DaoConfig {
myEnableInMemorySubscriptionMatching = theEnableInMemorySubscriptionMatching;
}
/**
* If set to <code>true</code> (default is true) the server will match incoming resources against active subscriptions
* and send them to the subscription channel. If set to <code>false</code> no matching or sending occurs.
* @since 3.7.0
*/
public boolean isSubscriptionMatchingEnabled() {
return mySubscriptionMatchingEnabled;
}
/**
* If set to <code>true</code> (default is true) the server will match incoming resources against active subscriptions
* and send them to the subscription channel. If set to <code>false</code> no matching or sending occurs.
* @since 3.7.0
*/
public void setSubscriptionMatchingEnabled(boolean theSubscriptionMatchingEnabled) {
mySubscriptionMatchingEnabled = theSubscriptionMatchingEnabled;
}
public ModelConfig getModelConfig() {
return myModelConfig;
}

View File

@ -30,7 +30,7 @@ import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionCannonicalizer;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails;
@ -72,6 +72,8 @@ public class SubscriptionActivatingInterceptor extends ServerOperationIntercepto
private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingInterceptor.class);
private static boolean ourWaitForSubscriptionActivationSynchronouslyForUnitTest;
private static final String REQUESTED_STATUS = Subscription.SubscriptionStatus.REQUESTED.toCode();
private static final String ACTIVE_STATUS = Subscription.SubscriptionStatus.ACTIVE.toCode();
@Autowired
private PlatformTransactionManager myTransactionManager;
@ -85,7 +87,7 @@ public class SubscriptionActivatingInterceptor extends ServerOperationIntercepto
@Autowired
private FhirContext myFhirContext;
@Autowired
private SubscriptionCannonicalizer mySubscriptionCannonicalizer;
private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
@Autowired
private MatchUrlService myMatchUrlService;
@Autowired
@ -108,9 +110,7 @@ public class SubscriptionActivatingInterceptor extends ServerOperationIntercepto
final IPrimitiveType<?> status = myFhirContext.newTerser().getSingleValueOrNull(theSubscription, SubscriptionMatcherInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class);
String statusString = status.getValueAsString();
final String requestedStatus = Subscription.SubscriptionStatus.REQUESTED.toCode();
final String activeStatus = Subscription.SubscriptionStatus.ACTIVE.toCode();
if (requestedStatus.equals(statusString)) {
if (REQUESTED_STATUS.equals(statusString)) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
/*
* If we're in a transaction, we don't want to try and change the status from
@ -127,7 +127,7 @@ public class SubscriptionActivatingInterceptor extends ServerOperationIntercepto
Future<?> activationFuture = myTaskExecutor.submit(new Runnable() {
@Override
public void run() {
activateSubscription(activeStatus, theSubscription, requestedStatus);
activateSubscription(ACTIVE_STATUS, theSubscription, REQUESTED_STATUS);
}
});
@ -146,9 +146,9 @@ public class SubscriptionActivatingInterceptor extends ServerOperationIntercepto
});
return true;
} else {
return activateSubscription(activeStatus, theSubscription, requestedStatus);
return activateSubscription(ACTIVE_STATUS, theSubscription, REQUESTED_STATUS);
}
} else if (activeStatus.equals(statusString)) {
} else if (ACTIVE_STATUS.equals(statusString)) {
return mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription);
} else {
// Status isn't "active" or "requested"
@ -220,7 +220,7 @@ public class SubscriptionActivatingInterceptor extends ServerOperationIntercepto
}
public void validateCriteria(final IBaseResource theResource) {
CanonicalSubscription subscription = mySubscriptionCannonicalizer.canonicalize(theResource);
CanonicalSubscription subscription = mySubscriptionCanonicalizer.canonicalize(theResource);
String criteria = subscription.getCriteriaString();
try {
RuntimeResourceDefinition resourceDef = CacheWarmingSvcImpl.parseUrlResourceType(myFhirContext, criteria);

View File

@ -53,8 +53,11 @@ public class SubscriptionInterceptorLoader {
if (!supportedSubscriptionTypes.isEmpty()) {
loadSubscriptions();
ourLog.info("Registering subscription interceptors");
ourLog.info("Registering subscription activating interceptor");
myDaoConfig.registerInterceptor(mySubscriptionActivatingInterceptor);
}
if (myDaoConfig.isSubscriptionMatchingEnabled()) {
ourLog.info("Registering subscription matcher interceptor");
myDaoConfig.registerInterceptor(mySubscriptionMatcherInterceptor);
}
}

View File

@ -32,8 +32,7 @@ import org.hl7.fhir.r4.model.EventDefinition;
import org.hl7.fhir.r4.model.Subscription;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -63,6 +62,8 @@ public class CanonicalSubscription implements Serializable {
private EmailDetails myEmailDetails;
@JsonProperty("restHookDetails")
private RestHookDetails myRestHookDetails;
@JsonProperty("extensions")
private Map<String, String> myChannelExtensions;
/**
* For now we're using the R4 TriggerDefinition, but this
@ -105,7 +106,7 @@ public class CanonicalSubscription implements Serializable {
}
public List<String> getHeaders() {
return myHeaders;
return Collections.unmodifiableList(myHeaders);
}
public void setHeaders(List<? extends IPrimitiveType<String>> theHeader) {
@ -124,6 +125,19 @@ public class CanonicalSubscription implements Serializable {
}
}
public String getChannelExtension(String url) {
return myChannelExtensions.get(url);
}
public void setChannelExtensions(Map<String, String> theChannelExtensions) {
myChannelExtensions = new HashMap<>();
for (String url: theChannelExtensions.keySet()) {
if (isNotBlank(url) && isNotBlank(theChannelExtensions.get(url))) {
myChannelExtensions.put(url, theChannelExtensions.get(url));
}
}
}
public IIdType getIdElement(FhirContext theContext) {
IIdType retVal = null;
if (isNotBlank(myIdElement)) {

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -24,20 +24,29 @@ import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.model.api.ExtensionDt;
import ca.uhn.fhir.model.api.IPrimitiveDatatype;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import org.hl7.fhir.exceptions.FHIRException;
import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@Service
public class SubscriptionCannonicalizer<S extends IBaseResource> {
public class SubscriptionCanonicalizer<S extends IBaseResource> {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionCanonicalizer.class);
@Autowired
FhirContext myFhirContext;
@ -64,6 +73,7 @@ public class SubscriptionCannonicalizer<S extends IBaseResource> {
retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
retVal.setHeaders(subscription.getChannel().getHeader());
retVal.setChannelExtensions(convertChannelExtensionsDstu2(subscription));
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(subscription.getChannel().getPayload());
} catch (FHIRException theE) {
@ -82,6 +92,7 @@ public class SubscriptionCannonicalizer<S extends IBaseResource> {
retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
retVal.setHeaders(subscription.getChannel().getHeader());
retVal.setChannelExtensions(convertChannelExtensionsDstu3(subscription));
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(subscription.getChannel().getPayload());
@ -118,6 +129,87 @@ public class SubscriptionCannonicalizer<S extends IBaseResource> {
return retVal;
}
private Map<String, String> convertChannelExtensionsDstu2(ca.uhn.fhir.model.dstu2.resource.Subscription theSubscription) {
Map<String, String> retval = new HashMap<>();
for (ExtensionDt extension : theSubscription.getChannel().getUndeclaredExtensions()) {
String url = extension.getUrl();
if (isNotBlank(url)) {
String value = extractExtension(theSubscription, url);
if (isNotBlank(value)) {
retval.put(url, value);
}
}
}
return retval;
}
private Map<String, String> convertChannelExtensionsDstu3(org.hl7.fhir.dstu3.model.Subscription theSubscription) {
Map<String, String> retval = new HashMap<>();
for (org.hl7.fhir.dstu3.model.Extension extension : theSubscription.getChannel().getExtension()) {
String url = extension.getUrl();
if (isNotBlank(url)) {
String value = extractExtension(theSubscription, url);
if (isNotBlank(value)) {
retval.put(url, value);
}
}
}
return retval;
}
private Map<String, String> convertChannelExtensionsR4(org.hl7.fhir.r4.model.Subscription theSubscription) {
Map<String, String> retval = new HashMap<>();
for (org.hl7.fhir.r4.model.Extension extension : theSubscription.getChannel().getExtension()) {
String url = extension.getUrl();
if (isNotBlank(url)) {
String value = extractExtension(theSubscription, url);
if (isNotBlank(value)) {
retval.put(url, value);
}
}
}
return retval;
}
private String extractExtension(IBaseResource theSubscription, String theUrl) {
try {
switch (theSubscription.getStructureFhirVersionEnum()) {
case DSTU2: {
ca.uhn.fhir.model.dstu2.resource.Subscription subscription = (ca.uhn.fhir.model.dstu2.resource.Subscription) theSubscription;
List<ExtensionDt> extensions = subscription.getChannel().getUndeclaredExtensionsByUrl(theUrl);
if (extensions.size() == 0) {
return null;
}
if (extensions.size() > 1) {
throw new FHIRException("Multiple matching extensions found");
}
if (!(extensions.get(0).getValue() instanceof IPrimitiveDatatype)) {
throw new FHIRException("Extension could not be converted to a string");
}
return ((IPrimitiveDatatype<?>) extensions.get(0).getValue()).getValueAsString();
}
case DSTU3: {
org.hl7.fhir.dstu3.model.Subscription subscription = (org.hl7.fhir.dstu3.model.Subscription) theSubscription;
return subscription.getChannel().getExtensionString(theUrl);
}
case R4: {
org.hl7.fhir.r4.model.Subscription subscription = (org.hl7.fhir.r4.model.Subscription) theSubscription;
return subscription.getChannel().getExtensionString(theUrl);
}
case DSTU2_HL7ORG:
case DSTU2_1:
default: {
ourLog.error("Failed to extract extension with URL {} from subscription {}", theUrl, theSubscription.getIdElement().toUnqualified().getValue());
break;
}
}
} catch (FHIRException theE) {
ourLog.error("Failed to extract extension with URL {} from subscription {}", theUrl, theSubscription.getIdElement().toUnqualified().getValue(), theE);
}
return null;
}
protected CanonicalSubscription canonicalizeR4(IBaseResource theSubscription) {
org.hl7.fhir.r4.model.Subscription subscription = (org.hl7.fhir.r4.model.Subscription) theSubscription;
@ -127,6 +219,7 @@ public class SubscriptionCannonicalizer<S extends IBaseResource> {
retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
retVal.setHeaders(subscription.getChannel().getHeader());
retVal.setChannelExtensions(convertChannelExtensionsR4(subscription));
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(subscription.getChannel().getPayload());

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
@ -46,7 +47,7 @@ public class SubscriptionRegistry {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionRegistry.class);
@Autowired
SubscriptionCannonicalizer mySubscriptionCanonicalizer;
SubscriptionCanonicalizer mySubscriptionCanonicalizer;
@Autowired
SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
@Autowired
@ -117,8 +118,13 @@ public class SubscriptionRegistry {
} else {
ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
}
registerSubscription(theSubscription.getIdElement(), theSubscription);
return true;
if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) {
registerSubscription(theSubscription.getIdElement(), theSubscription);
return true;
} else {
return false;
}
}
public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) {

View File

@ -79,16 +79,16 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
return theResource;
}
protected Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {
Subscription subscription = newSubscription(theCriteria, thePayload, theEndpoint);
protected Subscription sendSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {
Subscription subscription = returnedActiveSubscription(theCriteria, thePayload, theEndpoint);
return sendResource(subscription);
}
protected Subscription newSubscription(String theCriteria, String thePayload, String theEndpoint) {
protected Subscription returnedActiveSubscription(String theCriteria, String thePayload, String theEndpoint) {
Subscription subscription = new Subscription();
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
subscription.setCriteria(theCriteria);
++idCounter;
IdType id = new IdType("Subscription", idCounter);

View File

@ -15,28 +15,44 @@ import static org.junit.Assert.assertEquals;
public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
private String myCode = "1000000050";
@Before
public void loadSubscriptions() {
@Test
public void testSubscriptionLoaderFhirClient() throws Exception {
String payload = "application/fhir+json";
String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + myCode + "111&_format=xml";
List<Subscription> subs = new ArrayList<>();
subs.add(newSubscription(criteria1, payload, ourListenerServerBase));
subs.add(newSubscription(criteria2, payload, ourListenerServerBase));
subs.add(returnedActiveSubscription(criteria1, payload, ourListenerServerBase));
subs.add(returnedActiveSubscription(criteria2, payload, ourListenerServerBase));
IBundleProvider bundle = new SimpleBundleProvider(new ArrayList<>(subs), "uuid");
initSubscriptionLoader(bundle);
}
@Test
public void testSubscriptionLoaderFhirClient() throws Exception {
sendObservation(myCode, "SNOMED-CT");
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
}
@Test
public void testSubscriptionLoaderFhirClientSubscriptionNotActive() throws Exception {
String payload = "application/fhir+json";
String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + myCode + "111&_format=xml";
List<Subscription> subs = new ArrayList<>();
subs.add(returnedActiveSubscription(criteria1, payload, ourListenerServerBase).setStatus(Subscription.SubscriptionStatus.REQUESTED));
subs.add(returnedActiveSubscription(criteria2, payload, ourListenerServerBase).setStatus(Subscription.SubscriptionStatus.REQUESTED));
IBundleProvider bundle = new SimpleBundleProvider(new ArrayList<>(subs), "uuid");
initSubscriptionLoader(bundle);
sendObservation(myCode, "SNOMED-CT");
waitForSize(0, ourCreatedObservations);
waitForSize(0, ourUpdatedObservations);
}
}

View File

@ -22,8 +22,8 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
createSubscription(criteria1, payload, ourListenerServerBase);
createSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
sendObservation(code, "SNOMED-CT");
@ -40,8 +40,8 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
createSubscription(criteria1, payload, ourListenerServerBase);
createSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
sendObservation(code, "SNOMED-CT");
@ -58,8 +58,8 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
String criteria1 = "Observation?code=SNOMED-CT|" + code;
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111";
createSubscription(criteria1, payload, ourListenerServerBase);
createSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
sendObservation(code, "SNOMED-CT");

View File

@ -22,8 +22,8 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
createSubscription(criteria1, payload, ourListenerServerBase);
createSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
sendObservation(code, "SNOMED-CT");
@ -40,8 +40,8 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
createSubscription(criteria1, payload, ourListenerServerBase);
createSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
sendObservation(code, "SNOMED-CT");
@ -58,8 +58,8 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
String criteria1 = "Observation?code=SNOMED-CT|" + code;
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111";
createSubscription(criteria1, payload, ourListenerServerBase);
createSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
sendObservation(code, "SNOMED-CT");

View File

@ -38,9 +38,13 @@
by two new interceptors: SubscriptionActivatingInterceptor that is responsible for activating subscriptions
and SubscriptionMatchingInterceptor that is responsible for matching incoming resources against activated
subscriptions. Call DaoConfig.addSupportedSubscriptionType(type) to configure which subscription types
are supported in your environment. The helper method SubscriptionInterceptorLoader.registerInterceptors()
are supported in your environment. If you are processing subscriptions on a separate server and only want
to activate subscriptions on this server, you should set DaoConfig.setSubscriptionMatchingEnabled to false.
The helper method SubscriptionInterceptorLoader.registerInterceptors()
will check if any subscription types are supported, and if so then load active subscriptions into the
SubscriptionRegistry and then register both the activating and matching interceptors.
SubscriptionRegistry and register the subscription activating interceptor. This method also registers
the subscription matching interceptor (that matches incoming resources and sends matches to subscription
channels) only if DaoConfig.isSubscriptionMatchingEnabled is true.
See https://github.com/jamesagnew/hapi-fhir/wiki/Proposed-Subscription-Design-Change for more
details.
</action>