Merge branch 'master' into batch-empi-job

This commit is contained in:
Tadgh 2020-07-21 07:54:18 -07:00
commit 4ec2be4ff5
24 changed files with 611 additions and 84 deletions

View File

@ -1067,9 +1067,19 @@ class ParserState<T> {
* At some point it would be good to write code which can present a view
* of one type backed by another type and use that.
*/
FhirTerser t = myContext.newTerser();
// Clean up the cached resources
myGlobalResources.remove(myInstance);
myGlobalReferences.removeAll(t.getAllPopulatedChildElementsOfType(myInstance, IBaseReference.class));
IParser parser = myContext.newJsonParser();
String asString = parser.encodeResourceToString(myInstance);
myInstance = parser.parseResource(wantedProfileType, asString);
// Add newly created instance
myGlobalResources.add(myInstance);
myGlobalReferences.addAll(t.getAllPopulatedChildElementsOfType(myInstance, IBaseReference.class));
}
}
}

View File

@ -28,6 +28,7 @@ import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
@ -49,4 +50,27 @@ public class SearchParameterUtil {
return retVal;
}
@Nullable
public static String getCode(FhirContext theContext, IBaseResource theResource) {
return getStringChild(theContext, theResource, "code");
}
@Nullable
public static String getExpression(FhirContext theFhirContext, IBaseResource theResource) {
return getStringChild(theFhirContext, theResource, "expression");
}
private static String getStringChild(FhirContext theFhirContext, IBaseResource theResource, String theChildName) {
Validate.notNull(theFhirContext, "theContext must not be null");
Validate.notNull(theResource, "theResource must not be null");
RuntimeResourceDefinition def = theFhirContext.getResourceDefinition(theResource);
BaseRuntimeChildDefinition base = def.getChildByName(theChildName);
return base
.getAccessor()
.getFirstValueOrNull(theResource)
.map(t -> ((IPrimitiveType<?>) t))
.map(t -> t.getValueAsString())
.orElse(null);
}
}

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 1848
title: "When parsing a resource into a custom structure (e.g. a custom class extended a built-in FHIR resource class),
inter-resouirce references could contain invaliud java refeences to other resources. Thanks to Christian Ohr for reporting
and fixing!"

View File

@ -89,6 +89,7 @@ import ca.uhn.fhir.validation.IValidatorModule;
import ca.uhn.fhir.validation.ValidationOptions;
import ca.uhn.fhir.validation.ValidationResult;
import org.apache.commons.lang3.Validate;
import org.apache.commons.text.WordUtils;
import org.hl7.fhir.instance.model.api.IBaseCoding;
import org.hl7.fhir.instance.model.api.IBaseMetaType;
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
@ -122,7 +123,10 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends BaseHapiFhirDao<T> implements IFhirResourceDao<T> {
@ -759,8 +763,16 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
if (myDaoConfig.isMarkResourcesForReindexingUponSearchParameterChange()) {
if (isNotBlank(theExpression) && theExpression.contains(".")) {
final String resourceType = theExpression.substring(0, theExpression.indexOf('.'));
String expression = defaultString(theExpression);
Set<String> typesToMark = myDaoRegistry
.getRegisteredDaoTypes()
.stream()
.filter(t -> WordUtils.containsAllWords(expression, t))
.collect(Collectors.toSet());
for (String resourceType : typesToMark) {
ourLog.debug("Marking all resources of type {} for reindexing due to updated search parameter with path: {}", resourceType, theExpression);
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
@ -772,6 +784,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
ourLog.debug("Marked resources of type {} for reindexing", resourceType);
}
}
mySearchParamRegistry.requestRefresh();

View File

@ -89,9 +89,30 @@ public class FhirResourceDaoSearchParameterR4 extends BaseHapiFhirResourceDao<Se
}
public static void validateSearchParam(ISearchParamRegistry theSearchParamRegistry, ISearchParamExtractor theSearchParamExtractor, String theCode, Enum<?> theType, Enum<?> theStatus, List<? extends IPrimitiveType> theBase, String theExpression, FhirContext theContext, DaoConfig theDaoConfig) {
/*
* If overriding built-in SPs is disabled on this server, make sure we aren't
* doing that
*/
if (theDaoConfig.getModelConfig().isDefaultSearchParamsCanBeOverridden() == false) {
for (IPrimitiveType<?> nextBaseType : theBase) {
String nextBase = nextBaseType.getValueAsString();
RuntimeSearchParam existingSearchParam = theSearchParamRegistry.getActiveSearchParam(nextBase, theCode);
if (existingSearchParam != null && existingSearchParam.getId() == null) {
throw new UnprocessableEntityException("Can not override built-in search parameter " + nextBase + ":" + theCode + " because overriding is disabled on this server");
}
}
}
/*
* Everything below is validating that the SP is actually valid. We'll only do that if the
* SPO is active, so that we don't block people from uploading works-in-progress
*/
if (theStatus == null) {
throw new UnprocessableEntityException("SearchParameter.status is missing or invalid");
}
if (!theStatus.name().equals("ACTIVE")) {
return;
}
if (ElementUtil.isEmpty(theBase) && (theType == null || !Enumerations.SearchParamType.COMPOSITE.name().equals(theType.name()))) {
throw new UnprocessableEntityException("SearchParameter.base is missing");
@ -150,18 +171,6 @@ public class FhirResourceDaoSearchParameterR4 extends BaseHapiFhirResourceDao<Se
}
} // if have expression
// If overriding built-in SPs is disabled on this server, make sure we aren't
// doing that
if (theDaoConfig.getModelConfig().isDefaultSearchParamsCanBeOverridden() == false) {
for (IPrimitiveType<?> nextBaseType : theBase) {
String nextBase = nextBaseType.getValueAsString();
RuntimeSearchParam existingSearchParam = theSearchParamRegistry.getActiveSearchParam(nextBase, theCode);
if (existingSearchParam != null && existingSearchParam.getId() == null) {
throw new UnprocessableEntityException("Can not override built-in search parameter " + nextBase + ":" + theCode + " because overriding is disabled on this server");
}
}
}
}
}

View File

@ -31,8 +31,11 @@ import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.param.UriParam;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.FhirTerser;
import ca.uhn.fhir.util.SearchParameterUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
@ -55,6 +58,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.apache.commons.lang3.StringUtils.isBlank;
/**
* @since 5.1.0
*/
@ -73,9 +79,9 @@ public class PackageInstallerSvcImpl implements IPackageInstallerSvc {
boolean enabled = true;
@Autowired
private FhirContext fhirContext;
private FhirContext myFhirContext;
@Autowired
private DaoRegistry daoRegistry;
private DaoRegistry myDaoRegistry;
@Autowired
private IValidationSupport validationSupport;
@Autowired
@ -90,7 +96,7 @@ public class PackageInstallerSvcImpl implements IPackageInstallerSvc {
@PostConstruct
public void initialize() {
switch (fhirContext.getVersion().getVersion()) {
switch (myFhirContext.getVersion().getVersion()) {
case R5:
case R4:
case DSTU3:
@ -100,7 +106,7 @@ public class PackageInstallerSvcImpl implements IPackageInstallerSvc {
case DSTU2_HL7ORG:
case DSTU2_1:
default: {
ourLog.info("IG installation not supported for version: {}", fhirContext.getVersion().getVersion());
ourLog.info("IG installation not supported for version: {}", myFhirContext.getVersion().getVersion());
enabled = false;
}
}
@ -159,7 +165,7 @@ public class PackageInstallerSvcImpl implements IPackageInstallerSvc {
String version = npmPackage.getNpm().get("version").getAsString();
String fhirVersion = npmPackage.fhirVersion();
String currentFhirVersion = fhirContext.getVersion().getVersion().getFhirVersionString();
String currentFhirVersion = myFhirContext.getVersion().getVersion().getFhirVersionString();
assertFhirVersionsAreCompatible(fhirVersion, currentFhirVersion);
List<String> installTypes;
@ -264,9 +270,9 @@ public class PackageInstallerSvcImpl implements IPackageInstallerSvc {
for (String file : filesForType) {
try {
byte[] content = pkg.getFolders().get("package").fetchFile(file);
resources.add(fhirContext.newJsonParser().parseResource(new String(content)));
resources.add(myFhirContext.newJsonParser().parseResource(new String(content)));
} catch (IOException e) {
throw new InternalErrorException("Cannot install resource of type "+type+": Could not fetch file "+ file, e);
throw new InternalErrorException("Cannot install resource of type " + type + ": Could not fetch file " + file, e);
}
}
}
@ -277,21 +283,51 @@ public class PackageInstallerSvcImpl implements IPackageInstallerSvc {
* Create a resource or update it, if its already existing.
*/
private void createOrUpdate(IBaseResource resource) {
IFhirResourceDao dao = daoRegistry.getResourceDao(resource.getClass());
IBundleProvider searchResult = dao.search(createSearchParameterMapFor(resource));
if (searchResult.isEmpty()) {
dao.create(resource);
} else {
IBaseResource existingResource = verifySearchResultFor(resource, searchResult);
if (existingResource != null) {
resource.setId(existingResource.getIdElement().getValue());
dao.update(resource);
try {
IFhirResourceDao dao = myDaoRegistry.getResourceDao(resource.getClass());
IBundleProvider searchResult = dao.search(createSearchParameterMapFor(resource));
if (searchResult.isEmpty()) {
if (validForUpload(resource)) {
dao.create(resource);
}
} else {
IBaseResource existingResource = verifySearchResultFor(resource, searchResult);
if (existingResource != null) {
resource.setId(existingResource.getIdElement().getValue());
dao.update(resource);
}
}
} catch (BaseServerResponseException e) {
ourLog.warn("Failed to upload resource of type {} with ID {} - Error: {}", myFhirContext.getResourceType(resource), resource.getIdElement().getValue(), e.toString());
}
}
boolean validForUpload(IBaseResource theResource) {
String resourceType = myFhirContext.getResourceType(theResource);
if ("SearchParameter".equals(resourceType)) {
String code = SearchParameterUtil.getCode(myFhirContext, theResource);
if (defaultString(code).startsWith("_")) {
return false;
}
String expression = SearchParameterUtil.getExpression(myFhirContext, theResource);
if (isBlank(expression)) {
return false;
}
if (SearchParameterUtil.getBaseAsStrings(myFhirContext, theResource).isEmpty()) {
return false;
}
}
return true;
}
private boolean isStructureDefinitionWithoutSnapshot(IBaseResource r) {
FhirTerser terser = fhirContext.newTerser();
FhirTerser terser = myFhirContext.newTerser();
return r.getClass().getSimpleName().equals("StructureDefinition") &&
terser.getSingleValueOrNull(r, "snapshot") == null;
}
@ -319,7 +355,7 @@ public class PackageInstallerSvcImpl implements IPackageInstallerSvc {
}
private IBaseResource verifySearchResultFor(IBaseResource resource, IBundleProvider searchResult) {
FhirTerser terser = fhirContext.newTerser();
FhirTerser terser = myFhirContext.newTerser();
if (resource.getClass().getSimpleName().equals("NamingSystem")) {
if (searchResult.size() > 1) {
ourLog.warn("Expected 1 NamingSystem with unique ID {}, found {}. Will not attempt to update resource.",
@ -346,7 +382,7 @@ public class PackageInstallerSvcImpl implements IPackageInstallerSvc {
}
private String extractUniqeIdFromNamingSystem(IBaseResource resource) {
FhirTerser terser = fhirContext.newTerser();
FhirTerser terser = myFhirContext.newTerser();
IBase uniqueIdComponent = (IBase) terser.getSingleValueOrNull(resource, "uniqueId");
if (uniqueIdComponent == null) {
throw new ImplementationGuideInstallationException("NamingSystem does not have uniqueId component.");
@ -356,17 +392,22 @@ public class PackageInstallerSvcImpl implements IPackageInstallerSvc {
}
private String extractIdFromSubscription(IBaseResource resource) {
FhirTerser terser = fhirContext.newTerser();
FhirTerser terser = myFhirContext.newTerser();
IPrimitiveType asPrimitiveType = (IPrimitiveType) terser.getSingleValueOrNull(resource, "id");
return (String) asPrimitiveType.getValue();
}
private String extractUniqueUrlFromMetadataResouce(IBaseResource resource) {
FhirTerser terser = fhirContext.newTerser();
FhirTerser terser = myFhirContext.newTerser();
IPrimitiveType asPrimitiveType = (IPrimitiveType) terser.getSingleValueOrNull(resource, "url");
return (String) asPrimitiveType.getValue();
}
@VisibleForTesting
void setFhirContextForUnitTest(FhirContext theCtx) {
myFhirContext = theCtx;
}
private static IBaseResource getFirstResourceFrom(IBundleProvider searchResult) {
try {
return searchResult.getResources(0, 0).get(0);

View File

@ -1605,6 +1605,9 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
if (element.hasTarget()) {
TermConceptMapGroupElementTarget termConceptMapGroupElementTarget;
for (ConceptMap.TargetElementComponent elementTarget : element.getTarget()) {
if (isBlank(elementTarget.getCode())) {
continue;
}
termConceptMapGroupElementTarget = new TermConceptMapGroupElementTarget();
termConceptMapGroupElementTarget.setConceptMapGroupElement(termConceptMapGroupElement);
termConceptMapGroupElementTarget.setCode(elementTarget.getCode());

View File

@ -1086,6 +1086,51 @@ public class FhirResourceDaoR4ConceptMapTest extends BaseJpaR4Test {
}
/**
* Handle ConceptMaps where targets are missing, such as this one:
*
* https://www.hl7.org/fhir/conceptmap-example-specimen-type.html
*/
@Test
public void testUploadConceptMapWithMappingTargetsMissing() {
ConceptMap cm = new ConceptMap();
cm.setUrl("http://foo");
cm.setSource(new CanonicalType("http://source"));
cm.setTarget(new CanonicalType("http://target"));
cm.addGroup().addElement().setCode("source1").addTarget().setCode("target1").setEquivalence(ConceptMapEquivalence.EQUAL);
cm.addGroup().addElement().setCode("source2"); // no target
cm.addGroup().addElement().setCode("source3").addTarget().setComment("No target code"); // no target code
myConceptMapDao.create(cm);
runInTransaction(()->{
TranslationRequest translationRequest = new TranslationRequest();
translationRequest.getCodeableConcept().addCoding()
.setSystem("http://source")
.setCode("source1");
translationRequest.setTarget(new UriType("http://target"));
ourLog.info("*** About to translate");
TranslationResult translationResult = myConceptMapDao.translate(translationRequest, null);
ourLog.info("*** Done translating");
assertTrue(translationResult.getResult().booleanValue());
assertEquals("Matches found!", translationResult.getMessage().getValueAsString());
assertEquals(1, translationResult.getMatches().size());
TranslationMatch translationMatch = translationResult.getMatches().get(0);
assertEquals("equal", translationMatch.getEquivalence().getCode());
Coding concept = translationMatch.getConcept();
assertEquals("target1", concept.getCode());
assertEquals(null, concept.getDisplay());
assertEquals("http://target", concept.getSystem());
});
}
@Test
public void testUploadAndApplyR4DemoConceptMap() throws IOException {

View File

@ -91,6 +91,46 @@ public class FhirResourceDaoR4SearchCustomSearchParamTest extends BaseJpaR4Test
public void beforeDisableResultReuse() {
myDaoConfig.setReuseCachedSearchResultsForMillis(null);
myModelConfig.setDefaultSearchParamsCanBeOverridden(new ModelConfig().isDefaultSearchParamsCanBeOverridden());
myDaoConfig.setMarkResourcesForReindexingUponSearchParameterChange(new DaoConfig().isMarkResourcesForReindexingUponSearchParameterChange());
}
@Test
public void testStoreSearchParamWithBracketsInExpression() {
myDaoConfig.setMarkResourcesForReindexingUponSearchParameterChange(true);
SearchParameter fooSp = new SearchParameter();
fooSp.setCode("foo");
fooSp.addBase("ActivityDefinition");
fooSp.setType(Enumerations.SearchParamType.REFERENCE);
fooSp.setTitle("FOO SP");
fooSp.setExpression("(ActivityDefinition.useContext.value as Quantity) | (ActivityDefinition.useContext.value as Range)");
fooSp.setXpathUsage(org.hl7.fhir.r4.model.SearchParameter.XPathUsageType.NORMAL);
fooSp.setStatus(org.hl7.fhir.r4.model.Enumerations.PublicationStatus.ACTIVE);
// Ensure that no exceptions are thrown
mySearchParameterDao.create(fooSp, mySrd);
mySearchParamRegistry.forceRefresh();
}
/**
* Draft search parameters should be ok even if they aren't completely valid
*/
@Test
public void testStoreDraftSearchParam_DontValidate() {
myDaoConfig.setMarkResourcesForReindexingUponSearchParameterChange(true);
SearchParameter fooSp = new SearchParameter();
fooSp.setCode("foo");
fooSp.addBase("ActivityDefinition");
fooSp.setType(Enumerations.SearchParamType.REFERENCE);
fooSp.setTitle("FOO SP");
fooSp.setExpression("FOO FOO FOO");
fooSp.setXpathUsage(org.hl7.fhir.r4.model.SearchParameter.XPathUsageType.NORMAL);
fooSp.setStatus(Enumerations.PublicationStatus.DRAFT);
// Ensure that no exceptions are thrown
mySearchParameterDao.create(fooSp, mySrd);
mySearchParamRegistry.forceRefresh();
}

View File

@ -27,6 +27,7 @@ import org.hl7.fhir.r4.model.StructureDefinition;
import org.hl7.fhir.utilities.cache.NpmPackage;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -95,6 +96,18 @@ public class NpmTestR4 extends BaseJpaR4Test {
}
@Disabled("This test is super slow so don't run by default")
@Test
public void testInstallUsCore() {
JpaPackageCache jpaPackageCache = ProxyUtil.getSingletonTarget(myPackageCacheManager, JpaPackageCache.class);
jpaPackageCache.getPackageServers().clear();
jpaPackageCache.addPackageServer("https://packages.fhir.org");
PackageInstallationSpec spec = new PackageInstallationSpec().setName("hl7.fhir.us.core").setVersion("3.1.0").setInstallMode(PackageInstallationSpec.InstallModeEnum.STORE_AND_INSTALL).setFetchDependencies(true);
igInstaller.install(spec);
}
@Test
public void testCacheDstu3Package() throws Exception {
byte[] bytes = loadClasspathBytes("/packages/nictiz.fhir.nl.stu3.questionnaires-1.0.2.tgz");

View File

@ -0,0 +1,60 @@
package ca.uhn.fhir.jpa.packages;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.SearchParameter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class PackageInstallerSvcImplTest {
private final FhirContext myCtx = FhirContext.forCached(FhirVersionEnum.R4);
private PackageInstallerSvcImpl mySvc;
@BeforeEach
public void before() {
mySvc = new PackageInstallerSvcImpl();
mySvc.setFhirContextForUnitTest(myCtx);
}
@Test
public void testValidForUpload_SearchParameterWithMetaParam() {
SearchParameter sp = new SearchParameter();
sp.setCode("_id");
assertFalse(mySvc.validForUpload(sp));
}
@Test
public void testValidForUpload_SearchParameterWithNoBase() {
SearchParameter sp = new SearchParameter();
sp.setCode("name");
sp.setExpression("Patient.name");
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
assertFalse(mySvc.validForUpload(sp));
}
@Test
public void testValidForUpload_SearchParameterWithNoExpression() {
SearchParameter sp = new SearchParameter();
sp.setCode("name");
sp.addBase("Patient");
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
assertFalse(mySvc.validForUpload(sp));
}
@Test
public void testValidForUpload_GoodSearchParameter() {
SearchParameter sp = new SearchParameter();
sp.setCode("name");
sp.addBase("Patient");
sp.setExpression("Patient.name");
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
assertTrue(mySvc.validForUpload(sp));
}
}

View File

@ -20,9 +20,10 @@ package ca.uhn.fhir.jpa.subscription.channel.api;
* #L%
*/
import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.InterceptableChannel;
public interface IChannelReceiver extends SubscribableChannel, InterceptableChannel {
public interface IChannelReceiver extends SubscribableChannel, InterceptableChannel, DisposableBean {
String getName();
}

View File

@ -52,4 +52,9 @@ public class LinkedBlockingChannel extends ExecutorSubscribableChannel implement
public String getName() {
return myName;
}
@Override
public void destroy() {
// nothing
}
}

View File

@ -0,0 +1,75 @@
package ca.uhn.fhir.jpa.subscription.channel.subscription;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import org.apache.commons.lang3.Validate;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.AbstractSubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import java.util.Set;
public class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements IChannelReceiver {
private final IChannelReceiver myWrappedChannel;
private final MessageHandler myHandler;
public BroadcastingSubscribableChannelWrapper(IChannelReceiver theChannel) {
myHandler = message -> send(message);
theChannel.subscribe(myHandler);
myWrappedChannel = theChannel;
}
public SubscribableChannel getWrappedChannel() {
return myWrappedChannel;
}
@Override
protected boolean sendInternal(Message<?> theMessage, long timeout) {
Set<MessageHandler> subscribers = getSubscribers();
Validate.isTrue(subscribers.size() > 0, "Channel has zero subscribers");
for (MessageHandler next : subscribers) {
next.handleMessage(theMessage);
}
return true;
}
@Override
public void destroy() throws Exception {
myWrappedChannel.destroy();
myWrappedChannel.unsubscribe(myHandler);
}
@Override
public void addInterceptor(ChannelInterceptor interceptor) {
super.addInterceptor(interceptor);
myWrappedChannel.addInterceptor(interceptor);
}
@Override
public String getName() {
return myWrappedChannel.getName();
}
}

View File

@ -29,12 +29,6 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import org.apache.commons.lang3.Validate;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.AbstractSubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
public class SubscriptionChannelFactory {
private final IChannelFactory myChannelFactory;
@ -105,44 +99,4 @@ public class SubscriptionChannelFactory {
return myChannelFactory;
}
public static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements IChannelReceiver, DisposableBean {
private final IChannelReceiver myWrappedChannel;
public BroadcastingSubscribableChannelWrapper(IChannelReceiver theChannel) {
theChannel.subscribe(message -> send(message));
myWrappedChannel = theChannel;
}
public SubscribableChannel getWrappedChannel() {
return myWrappedChannel;
}
@Override
protected boolean sendInternal(Message<?> theMessage, long timeout) {
for (MessageHandler next : getSubscribers()) {
next.handleMessage(theMessage);
}
return true;
}
@Override
public void destroy() throws Exception {
if (myWrappedChannel instanceof DisposableBean) {
((DisposableBean) myWrappedChannel).destroy();
}
}
@Override
public void addInterceptor(ChannelInterceptor interceptor) {
super.addInterceptor(interceptor);
myWrappedChannel.addInterceptor(interceptor);
}
@Override
public String getName() {
return myWrappedChannel.getName();
}
}
}

View File

@ -1,8 +1,10 @@
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
@ -44,7 +46,7 @@ public class MatchingQueueSubscriberLoader {
@Autowired
private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
protected SubscribableChannel myMatchingChannel;
protected IChannelReceiver myMatchingChannel;
@EventListener(classes = {ContextRefreshedEvent.class})
public void handleContextRefreshEvent() {
@ -61,8 +63,10 @@ public class MatchingQueueSubscriberLoader {
@SuppressWarnings("unused")
@PreDestroy
public void stop() {
public void stop() throws Exception {
if (myMatchingChannel != null) {
ourLog.info("Destroying matching Channel {} with name {}", myMatchingChannel.getClass().getName(), SUBSCRIPTION_MATCHING_CHANNEL_NAME);
myMatchingChannel.destroy();
myMatchingChannel.unsubscribe(mySubscriptionMatchingSubscriber);
myMatchingChannel.unsubscribe(mySubscriptionActivatingSubscriber);
myMatchingChannel.unsubscribe(mySubscriptionRegisteringSubscriber);

View File

@ -0,0 +1,44 @@
package ca.uhn.fhir.jpa.subscription.channel.subscription;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.messaging.MessageDeliveryException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
class BroadcastingSubscribableChannelWrapperTest {
@Mock
private IChannelReceiver myReceiver;
@Test
public void testFailIfNoSubscribers() {
BroadcastingSubscribableChannelWrapper svc = new BroadcastingSubscribableChannelWrapper(myReceiver);
try {
svc.send(new ResourceModifiedJsonMessage(new ResourceModifiedMessage()));
} catch (MessageDeliveryException e) {
assertThat(e.getMessage(), containsString("Channel has zero subscribers"));
}
}
@Test
public void testWrappedChannelDestroyed() throws Exception {
BroadcastingSubscribableChannelWrapper svc = new BroadcastingSubscribableChannelWrapper(myReceiver);
svc.destroy();
verify(myReceiver, times(1)).destroy();
}
}

View File

@ -95,6 +95,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
private static SubscribableChannel ourSubscribableChannel;
protected final PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED);
protected final PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
protected final PointcutLatch mySubscriptionAfterDelivery = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_DELIVERY);
@BeforeEach
public void beforeReset() {
@ -111,6 +112,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
ourSubscribableChannel.subscribe(subscriptionRegisteringSubscriber);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, mySubscriptionAfterDelivery);
}
@AfterEach

View File

@ -73,9 +73,11 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
assertEquals(2, mySubscriptionRegistry.size());
mySubscriptionAfterDelivery.setExpectedCount(1);
ourObservationListener.setExpectedCount(0);
sendObservation(code, "SNOMED-CT");
ourObservationListener.clear();
mySubscriptionAfterDelivery.awaitExpected();
assertEquals(0, ourContentTypes.size());
}

View File

@ -69,9 +69,11 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
assertEquals(2, mySubscriptionRegistry.size());
mySubscriptionAfterDelivery.setExpectedCount(1);
ourObservationListener.setExpectedCount(0);
sendObservation(code, "SNOMED-CT");
ourObservationListener.clear();
mySubscriptionAfterDelivery.awaitExpected();
assertEquals(0, ourContentTypes.size());
}

View File

@ -0,0 +1,31 @@
package ca.uhn.fhir.parser;
import ca.uhn.fhir.model.api.annotation.Child;
import ca.uhn.fhir.model.api.annotation.Extension;
import ca.uhn.fhir.model.api.annotation.ResourceDef;
import org.hl7.fhir.dstu3.model.CodeType;
import org.hl7.fhir.dstu3.model.Patient;
@ResourceDef(
name = "Patient",
profile = "http://acme.org//StructureDefinition/patient-with-eyes"
)
public class ExtendedPatient extends Patient {
@Child(name = "eyeColour")
@Extension(url = "http://acme.org/#extpt", definedLocally = false, isModifier = false)
private CodeType myEyeColour;
public CodeType getEyeColour() {
if (myEyeColour == null) {
myEyeColour = new CodeType();
}
return myEyeColour;
}
public void setEyeColour(CodeType theEyeColour) {
myEyeColour = theEyeColour;
}
}

View File

@ -0,0 +1,56 @@
package ca.uhn.fhir.parser;
import ca.uhn.fhir.context.FhirContext;
import org.hl7.fhir.dstu3.model.Bundle;
import org.hl7.fhir.dstu3.model.CodeType;
import org.hl7.fhir.dstu3.model.Patient;
import org.hl7.fhir.dstu3.model.Reference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
public class ExtendedPatientTest {
@Test
public void testBundleReferences() {
FhirContext fhirContext = FhirContext.forDstu3();
fhirContext.setDefaultTypeForProfile("http://acme.org//StructureDefinition/patient-with-eyes", ExtendedPatient.class);
ExtendedPatient homer = new ExtendedPatient();
homer.setId("homer");
homer.addName().setFamily("Simpson").addGiven("Homer");
ExtendedPatient marge = new ExtendedPatient();
marge.setId("marge");
marge.addName().setFamily("Simpson").addGiven("Marge");
marge.setEyeColour(new CodeType("blue"));
marge.getLink().add(new Patient.PatientLinkComponent()
.setType(Patient.LinkType.REFER)
.setOther(new Reference("Patient/homer")));
Bundle bundle = new Bundle()
.addEntry(new Bundle.BundleEntryComponent()
.setFullUrl("http://acme.org/Patient/homer").setResource(homer)
.setSearch(new Bundle.BundleEntrySearchComponent()
.setMode(Bundle.SearchEntryMode.INCLUDE)))
.addEntry(new Bundle.BundleEntryComponent()
.setFullUrl("http://acme.org/Patient/marge").setResource(marge)
.setSearch(new Bundle.BundleEntrySearchComponent()));
IParser p = fhirContext.newXmlParser().setPrettyPrint(true);
String encoded = p.encodeResourceToString(bundle);
Bundle parsedBundle = p.parseResource(Bundle.class, encoded);
ExtendedPatient parsedHomer = (ExtendedPatient)parsedBundle.getEntry().get(0).getResource();
ExtendedPatient parsedMarge = (ExtendedPatient)parsedBundle.getEntry().get(1).getResource();
IBaseResource referencedHomer = parsedMarge.getLinkFirstRep().getOther().getResource();
assertNotNull(referencedHomer);
assertSame(parsedHomer, referencedHomer);
}
}

View File

@ -0,0 +1,31 @@
package ca.uhn.fhir.parser;
import ca.uhn.fhir.model.api.annotation.Child;
import ca.uhn.fhir.model.api.annotation.Extension;
import ca.uhn.fhir.model.api.annotation.ResourceDef;
import org.hl7.fhir.r4.model.CodeType;
import org.hl7.fhir.r4.model.Patient;
@ResourceDef(
name = "Patient",
profile = "http://acme.org//StructureDefinition/patient-with-eyes"
)
public class ExtendedPatient extends Patient {
@Child(name = "eyeColour")
@Extension(url = "http://acme.org/#extpt", definedLocally = false, isModifier = false)
private CodeType myEyeColour;
public CodeType getEyeColour() {
if (myEyeColour == null) {
myEyeColour = new CodeType();
}
return myEyeColour;
}
public void setEyeColour(CodeType theEyeColour) {
myEyeColour = theEyeColour;
}
}

View File

@ -0,0 +1,56 @@
package ca.uhn.fhir.parser;
import ca.uhn.fhir.context.FhirContext;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.CodeType;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Reference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
public class ExtendedPatientTest {
@Test
public void testBundleReferences() {
FhirContext fhirContext = FhirContext.forR4();
fhirContext.setDefaultTypeForProfile("http://acme.org//StructureDefinition/patient-with-eyes", ExtendedPatient.class);
ExtendedPatient homer = new ExtendedPatient();
homer.setId("homer");
homer.addName().setFamily("Simpson").addGiven("Homer");
ExtendedPatient marge = new ExtendedPatient();
marge.setId("marge");
marge.addName().setFamily("Simpson").addGiven("Marge");
marge.setEyeColour(new CodeType("blue"));
marge.getLink().add(new Patient.PatientLinkComponent()
.setType(Patient.LinkType.REFER)
.setOther(new Reference("Patient/homer")));
Bundle bundle = new Bundle()
.addEntry(new Bundle.BundleEntryComponent()
.setFullUrl("http://acme.org/Patient/homer").setResource(homer)
.setSearch(new Bundle.BundleEntrySearchComponent()
.setMode(Bundle.SearchEntryMode.INCLUDE)))
.addEntry(new Bundle.BundleEntryComponent()
.setFullUrl("http://acme.org/Patient/marge").setResource(marge)
.setSearch(new Bundle.BundleEntrySearchComponent()));
IParser p = fhirContext.newXmlParser().setPrettyPrint(true);
String encoded = p.encodeResourceToString(bundle);
Bundle parsedBundle = p.parseResource(Bundle.class, encoded);
ExtendedPatient parsedHomer = (ExtendedPatient)parsedBundle.getEntry().get(0).getResource();
ExtendedPatient parsedMarge = (ExtendedPatient)parsedBundle.getEntry().get(1).getResource();
IBaseResource referencedHomer = parsedMarge.getLinkFirstRep().getOther().getResource();
assertNotNull(referencedHomer);
assertSame(parsedHomer, referencedHomer);
}
}