SubscriptionTopic part3 (#4817)

* Force Verify tests

* wip

* merge troubleshooting rel_6_6 troubleshooting changes

* fix ITs (#4809)

* fix RestHookTestR5IT

* fix intermittent

---------

Co-authored-by: Ken Stevens <ken@smilecdr.com>

* post-merge cleanup

* fix test

* fix mock test

* fix wiring

* fix mock test

* fix test

* use IBaseResource.isDeleted()

* fixmes

* cleanup

* change log

---------

Co-authored-by: Tadgh <garygrantgraham@gmail.com>
Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
Ken Stevens 2023-05-09 16:29:21 -04:00 committed by GitHub
parent e3545446eb
commit 224b7f6206
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 243 additions and 190 deletions

View File

@ -28,6 +28,8 @@ public class Logs {
private static final Logger ourSubscriptionTroubleshootingLog = LoggerFactory.getLogger("ca.cdr.log.subscription_troubleshooting");
private static final Logger ourSubscriptionTopicLog = LoggerFactory.getLogger("ca.uhn.fhir.log.subscription_topic_troubleshooting");
public static Logger getBatchTroubleshootingLog() {
return ourBatchTroubleshootingLog;
}
@ -39,4 +41,8 @@ public class Logs {
public static Logger getSubscriptionTroubleshootingLog() {
return ourSubscriptionTroubleshootingLog;
}
public static Logger getSubscriptionTopicLog() {
return ourSubscriptionTopicLog;
}
}

View File

@ -19,15 +19,16 @@
*/
package org.hl7.fhir.instance.model.api;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.model.api.IElement;
import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.model.api.IElement;
import ca.uhn.fhir.model.api.Include;
/**
* For now, this is a simple marker interface indicating that a class is a resource type.
* There are two concrete types of implementations of this interrface. The first are
@ -58,4 +59,10 @@ public interface IBaseResource extends IBase, IElement {
FhirVersionEnum getStructureFhirVersionEnum();
/**
* @return <code>true</code> if this resource has been deleted
*/
default boolean isDeleted() {
return ResourceMetadataKeyEnum.DELETED_AT.get(this) != null;
}
}

View File

@ -0,0 +1,8 @@
---
type: change
issue: 4817
title: "Introduce IBaseResource.isDeleted() method and convert code to use it.
Add subscription_topic_troubleshooting log.
No longer rely on ResourceGoneException to detect deleted subscription. Instead use the new isDeleted() method.
Demote unexpected exceptions in HapiTransactionService from error to debug since these exceptions are expected
e.g. when checking if a resource has been deleted by catching a ResourceGoneException"

View File

@ -1412,11 +1412,8 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
myJpaStorageResourceParser.populateResourceMetadata(entity, false, tagList, version, theResource);
boolean wasDeleted = false;
// NB If this if-else ever gets collapsed, make sure to account for possible null (will happen in mass-ingestion mode)
if (theOldResource instanceof IResource) {
wasDeleted = ResourceMetadataKeyEnum.DELETED_AT.get((IResource) theOldResource) != null;
} else if (theOldResource instanceof IAnyResource) {
wasDeleted = ResourceMetadataKeyEnum.DELETED_AT.get((IAnyResource) theOldResource) != null;
if (theOldResource != null) {
wasDeleted = theOldResource.isDeleted();
}
DaoMethodOutcome outcome = toMethodOutcome(theRequestDetails, savedEntity, theResource, theMatchUrl, theOperationType).setCreated(wasDeleted);

View File

@ -29,7 +29,6 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
@ -93,17 +92,15 @@ public class SubscriptionRegisteringSubscriber implements MessageHandler {
// - in order to store partition id in the userdata of the resource for partitioned subscriptions
// - in case we're processing out of order and a create-then-delete has been processed backwards (or vice versa)
IBaseResource payloadResource;
IIdType payloadId = payload.getPayloadId(myFhirContext).toUnqualifiedVersionless();
try {
IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getResourceDao("Subscription");
RequestDetails systemRequestDetails = getPartitionAwareRequestDetails(payload);
payloadResource = subscriptionDao.read(payloadId, systemRequestDetails);
if (payloadResource == null) {
// Only for unit test
payloadResource = payload.getPayload(myFhirContext);
}
} catch (ResourceGoneException e) {
IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getResourceDao("Subscription");
RequestDetails systemRequestDetails = getPartitionAwareRequestDetails(payload);
IBaseResource payloadResource = subscriptionDao.read(payloadId, systemRequestDetails, true);
if (payloadResource == null) {
// Only for unit test
payloadResource = payload.getPayload(myFhirContext);
}
if (payloadResource.isDeleted()) {
mySubscriptionRegistry.unregisterSubscriptionIfRegistered(payloadId.getIdPart());
return;
}

View File

@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import org.hl7.fhir.convertors.factory.VersionConvertorFactory_43_50;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.SubscriptionTopic;
@ -31,11 +32,10 @@ public final class SubscriptionTopicCanonicalizer {
}
// WIP STR5 use elsewhere
public static SubscriptionTopic canonicalize(FhirContext theFhirContext, IBaseResource theSubscriptionTopic) {
public static SubscriptionTopic canonicalizeTopic(FhirContext theFhirContext, IBaseResource theSubscriptionTopic) {
switch (theFhirContext.getVersion().getVersion()) {
case R4B:
String encoded = theFhirContext.newJsonParser().encodeResourceToString(theSubscriptionTopic);
return ourFhirContextR5.newJsonParser().parseResource(SubscriptionTopic.class, encoded);
return (SubscriptionTopic) VersionConvertorFactory_43_50.convertResource((org.hl7.fhir.r4b.model.SubscriptionTopic) theSubscriptionTopic);
case R5:
return (SubscriptionTopic) theSubscriptionTopic;
default:

View File

@ -26,11 +26,11 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.util.Logs;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.Enumerations;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
@ -40,7 +40,7 @@ import java.util.Set;
public class SubscriptionTopicLoader extends BaseResourceCacheSynchronizer {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicLoader.class);
private static final Logger ourLog = Logs.getSubscriptionTopicLog();
@Autowired
private FhirContext myFhirContext;
@ -107,10 +107,7 @@ public class SubscriptionTopicLoader extends BaseResourceCacheSynchronizer {
if (theResource instanceof SubscriptionTopic) {
return (SubscriptionTopic) theResource;
} else if (theResource instanceof org.hl7.fhir.r4b.model.SubscriptionTopic) {
return myFhirContext.newJsonParser().parseResource(SubscriptionTopic.class, FhirContext.forR4BCached().newJsonParser().encodeResourceToString(theResource));
// WIP STR5 VersionConvertorFactory_43_50 when it supports SubscriptionTopic
// track here: https://github.com/hapifhir/org.hl7.fhir.core/issues/1212
// return (SubscriptionTopic) VersionConvertorFactory_43_50.convertResource((org.hl7.fhir.r4b.model.SubscriptionTopic) theResource);
return SubscriptionTopicCanonicalizer.canonicalizeTopic(myFhirContext, theResource);
} else {
throw new IllegalArgumentException(Msg.code(2332) + "Only R4B and R5 SubscriptionTopic is currently supported. Found " + theResource.getClass());
}

View File

@ -29,11 +29,11 @@ import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.util.Logs;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
@ -45,7 +45,7 @@ import java.util.List;
import java.util.UUID;
public class SubscriptionTopicMatchingSubscriber implements MessageHandler {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicMatchingSubscriber.class);
private static final Logger ourLog = Logs.getSubscriptionTopicLog();
private final FhirContext myFhirContext;
@Autowired

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.util.BundleBuilder;
import org.hl7.fhir.convertors.factory.VersionConvertorFactory_43_50;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.Bundle;
@ -55,11 +56,7 @@ public class SubscriptionTopicPayloadBuilder {
if (fhirVersion == FhirVersionEnum.R4B) {
bundleBuilder.setType(Bundle.BundleType.HISTORY.toCode());
String serializedSubscriptionStatus = FhirContext.forR5Cached().newJsonParser().encodeResourceToString(subscriptionStatus);
subscriptionStatus = myFhirContext.newJsonParser().parseResource(org.hl7.fhir.r4b.model.SubscriptionStatus.class, serializedSubscriptionStatus);
// WIP STR5 VersionConvertorFactory_43_50 when it supports SubscriptionStatus
// track here: https://github.com/hapifhir/org.hl7.fhir.core/issues/1212
// subscriptionStatus = (SubscriptionStatus) VersionConvertorFactory_43_50.convertResource((org.hl7.fhir.r4b.model.SubscriptionStatus) subscriptionStatus);
subscriptionStatus = VersionConvertorFactory_43_50.convertResource((org.hl7.fhir.r5.model.SubscriptionStatus) subscriptionStatus);
} else if (fhirVersion == FhirVersionEnum.R5) {
bundleBuilder.setType(Bundle.BundleType.SUBSCRIPTIONNOTIFICATION.toCode());
} else {

View File

@ -28,12 +28,12 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.util.Logs;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r5.model.Enumerations;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
@ -48,7 +48,8 @@ import javax.annotation.Nonnull;
* Also validates criteria. If invalid, rejects the subscription without persisting the subscription.
*/
public class SubscriptionTopicRegisteringSubscriber implements MessageHandler {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicRegisteringSubscriber.class);
private static final Logger ourLog = Logs.getSubscriptionTopicLog();
@Autowired
private FhirContext myFhirContext;
@Autowired
@ -106,7 +107,7 @@ public class SubscriptionTopicRegisteringSubscriber implements MessageHandler {
return;
}
SubscriptionTopic subscriptionTopic = SubscriptionTopicCanonicalizer.canonicalize(myFhirContext, payloadResource);
SubscriptionTopic subscriptionTopic = SubscriptionTopicCanonicalizer.canonicalizeTopic(myFhirContext, payloadResource);
if (subscriptionTopic.getStatus() == Enumerations.PublicationStatus.ACTIVE) {
mySubscriptionTopicRegistry.register(subscriptionTopic);
} else {

View File

@ -30,14 +30,15 @@ import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.Logs;
import com.google.common.annotations.VisibleForTesting;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SubscriptionTopicValidatingInterceptor {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicValidatingInterceptor.class);
private static final Logger ourLog = Logs.getSubscriptionTopicLog();
private final FhirContext myFhirContext;
private final SubscriptionQueryValidator mySubscriptionQueryValidator;
@ -69,7 +70,7 @@ public class SubscriptionTopicValidatingInterceptor {
return;
}
SubscriptionTopic subscriptionTopic = SubscriptionTopicCanonicalizer.canonicalize(myFhirContext, theSubscription);
SubscriptionTopic subscriptionTopic = SubscriptionTopicCanonicalizer.canonicalizeTopic(myFhirContext, theSubscription);
boolean finished = false;
if (subscriptionTopic.getStatus() == null) {

View File

@ -24,17 +24,18 @@ import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import ca.uhn.fhir.util.Logs;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r5.model.Enumeration;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class SubscriptionTriggerMatcher {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggerMatcher.class);
private static final Logger ourLog = Logs.getSubscriptionTopicLog();
private final SubscriptionTopicSupport mySubscriptionTopicSupport;
private final BaseResourceMessage.OperationTypeEnum myOperation;
private final SubscriptionTopic.SubscriptionTopicResourceTriggerComponent myTrigger;

View File

@ -4,17 +4,19 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import ca.uhn.fhir.rest.server.messaging.json.ResourceOperationJsonMessage;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4.model.codesystems.SubscriptionStatus;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -34,6 +36,7 @@ import java.util.List;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -46,8 +49,8 @@ public class SubscriptionRegisteringSubscriberTest {
private FhirContext myFhirContext = FhirContext.forR4Cached();
@Mock
private SubscriptionRegistry mySubscriptionRegistry;
@Mock
private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
@Spy
private SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(myFhirContext);
@Mock
private DaoRegistry myDaoRegistry;
@Mock
@ -61,8 +64,15 @@ public class SubscriptionRegisteringSubscriberTest {
@BeforeEach
public void beforeEach() {
mySubscription = new Subscription();
mySubscription.setId("Subscription/testrest");
mySubscription = buildSubscription();
}
@NotNull
private static Subscription buildSubscription() {
Subscription subscription = new Subscription();
subscription.setId("Subscription/testrest");
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
return subscription;
}
@Test
@ -79,7 +89,9 @@ public class SubscriptionRegisteringSubscriberTest {
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(resourceModifiedMessage);
when(myDaoRegistry.getResourceDao("Subscription")).thenReturn(mySubscriptionDao);
when(mySubscriptionDao.read(any(), any())).thenThrow(ResourceGoneException.class);
Subscription deletedSubscription = buildSubscription();
ResourceMetadataKeyEnum.DELETED_AT.put(deletedSubscription, InstantType.withCurrentTime());
when(mySubscriptionDao.read(any(), any(), eq(true))).thenReturn(deletedSubscription);
mySubscriptionRegisteringSubscriber.handleMessage(message);
verify(mySubscriptionRegistry, times(1)).unregisterSubscriptionIfRegistered(any());
@ -92,7 +104,7 @@ public class SubscriptionRegisteringSubscriberTest {
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(resourceModifiedMessage);
when(myDaoRegistry.getResourceDao("Subscription")).thenReturn(mySubscriptionDao);
when(mySubscriptionDao.read(any(), any())).thenReturn(mySubscription);
when(mySubscriptionDao.read(any(), any(), eq(true))).thenReturn(mySubscription);
when(mySubscriptionCanonicalizer.getSubscriptionStatus(mySubscription)).thenReturn(SubscriptionStatus.ACTIVE.toCode());
mySubscriptionRegisteringSubscriber.handleMessage(message);
@ -106,7 +118,7 @@ public class SubscriptionRegisteringSubscriberTest {
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(resourceModifiedMessage);
when(myDaoRegistry.getResourceDao("Subscription")).thenReturn(mySubscriptionDao);
when(mySubscriptionDao.read(any(), any())).thenReturn(mySubscription);
when(mySubscriptionDao.read(any(), any(), eq(true))).thenReturn(mySubscription);
when(mySubscriptionCanonicalizer.getSubscriptionStatus(mySubscription)).thenReturn(SubscriptionStatus.ERROR.toCode());
mySubscriptionRegisteringSubscriber.handleMessage(message);
@ -126,7 +138,7 @@ public class SubscriptionRegisteringSubscriberTest {
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(resourceModifiedMessage);
when(myDaoRegistry.getResourceDao("Subscription")).thenReturn(mySubscriptionDao);
when(mySubscriptionDao.read(any(), requestDetailsCaptor.capture())).thenReturn(mySubscription);
when(mySubscriptionDao.read(any(), requestDetailsCaptor.capture(), eq(true))).thenReturn(mySubscription);
when(mySubscriptionCanonicalizer.getSubscriptionStatus(mySubscription)).thenReturn(SubscriptionStatus.ACTIVE.toCode());
mySubscriptionRegisteringSubscriber.handleMessage(message);
@ -147,7 +159,7 @@ public class SubscriptionRegisteringSubscriberTest {
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(resourceModifiedMessage);
when(myDaoRegistry.getResourceDao("Subscription")).thenReturn(mySubscriptionDao);
when(mySubscriptionDao.read(any(), requestDetailsCaptor.capture())).thenReturn(mySubscription);
when(mySubscriptionDao.read(any(), requestDetailsCaptor.capture(), eq(true))).thenReturn(mySubscription);
when(mySubscriptionCanonicalizer.getSubscriptionStatus(mySubscription)).thenReturn(SubscriptionStatus.ACTIVE.toCode());
mySubscriptionRegisteringSubscriber.handleMessage(message);

View File

@ -12,6 +12,7 @@ import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSearchParamProvider;
import ca.uhn.fhir.jpa.subscription.util.SubscriptionDebugLogInterceptor;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.system.HapiSystemProperties;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -35,6 +36,7 @@ import static org.mockito.Mockito.mock;
BaseSubscriptionTest.MyConfig.class
})
public abstract class BaseSubscriptionTest {
private static final SubscriptionDebugLogInterceptor ourSubscriptionDebugLogInterceptor = new SubscriptionDebugLogInterceptor();
static {
HapiSystemProperties.enableUnitTestMode();
@ -52,11 +54,13 @@ public abstract class BaseSubscriptionTest {
@BeforeEach
public void before() {
mySearchParamRegistry.handleInit(Collections.emptyList());
myInterceptorRegistry.registerInterceptor(ourSubscriptionDebugLogInterceptor);
}
@AfterEach
public void afterClearAnonymousLambdas() {
myInterceptorRegistry.unregisterAllInterceptors();
myInterceptorRegistry.unregisterInterceptor(ourSubscriptionDebugLogInterceptor);
}
public void initSearchParamRegistry(IBaseResource theReadResource) {
@ -68,7 +72,7 @@ public abstract class BaseSubscriptionTest {
public static class MyConfig {
@Bean
public JpaStorageSettings storageSettings() {
public JpaStorageSettings jpaStorageSettings() {
return new JpaStorageSettings();
}

View File

@ -3,7 +3,6 @@ package ca.uhn.fhir.jpa.subscription.module.config;
import ca.uhn.fhir.jpa.cache.IResourceVersionSvc;
import ca.uhn.fhir.jpa.cache.ResourceVersionMap;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.InMemorySubscriptionMatcher;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import org.springframework.context.annotation.Bean;
@ -25,11 +24,6 @@ public class TestSubscriptionConfig {
return new PartitionSettings();
}
@Bean
public StorageSettings storageSettings() {
return new StorageSettings();
}
@Bean
public IGenericClient fhirClient() {
return mock(IGenericClient.class);

View File

@ -11,7 +11,6 @@ import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
@ -100,8 +99,6 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
@Autowired
IInterceptorService myInterceptorRegistry;
@Autowired
private SubscriptionLoader mySubscriptionLoader;
@Autowired
private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
@BeforeEach

View File

@ -401,7 +401,7 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
Subscription modifiedSubscription = subscription.copy();
// the original partition info was the request info, but we need the actual storage partition.
modifiedSubscription.setUserData(Constants.RESOURCE_PARTITION_ID, theRequestPartitionId);
when(myMockSubscriptionDao.read(eq(subscription.getIdElement()), any())).thenReturn(modifiedSubscription);
when(myMockSubscriptionDao.read(eq(subscription.getIdElement()), any(), eq(true))).thenReturn(modifiedSubscription);
}
@Nested

View File

@ -0,0 +1,20 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext;
import org.hl7.fhir.r4b.model.Enumerations;
import org.hl7.fhir.r4b.model.SubscriptionTopic;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
class SubscriptionTopicCanonicalizerTest {
@Test
public void testCanonicalizeTopic() {
SubscriptionTopic topic = new SubscriptionTopic();
topic.setId("123");
topic.setStatus(Enumerations.PublicationStatus.ACTIVE);
org.hl7.fhir.r5.model.SubscriptionTopic canonicalized = SubscriptionTopicCanonicalizer.canonicalizeTopic(FhirContext.forR4BCached(), topic);
assertEquals("123", canonicalized.getId());
assertEquals(org.hl7.fhir.r5.model.Enumerations.PublicationStatus.ACTIVE, canonicalized.getStatus());
}
}

View File

@ -104,7 +104,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -693,7 +692,7 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
IBundleProvider history = myPatientDao.history(null, null, null, mySrd);
assertEquals(4 + initialHistory, history.sizeOrThrowNpe());
List<IBaseResource> resources = history.getResources(0, 4);
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get((IResource) resources.get(0)));
assertTrue(resources.get(0).isDeleted());
try {
myPatientDao.delete(id2, mySrd);
@ -796,10 +795,8 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
IBundleProvider history = myPatientDao.history(id, null, null, null, mySrd);
assertEquals(2, history.size().intValue());
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get((IResource) history.getResources(0, 1).get(0)));
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get((IResource) history.getResources(0, 1).get(0)).getValue());
assertNull(ResourceMetadataKeyEnum.DELETED_AT.get((IResource) history.getResources(1, 2).get(0)));
assertTrue(history.getResources(0, 1).get(0).isDeleted());
assertFalse(history.getResources(1, 2).get(0).isDeleted());
}
@Test
@ -1206,13 +1203,13 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
assertEquals(id.withVersion("2"), entries.get(1).getIdElement());
assertEquals(id.withVersion("1"), entries.get(2).getIdElement());
assertNull(ResourceMetadataKeyEnum.DELETED_AT.get((IResource) entries.get(0)));
assertFalse(entries.get(0).isDeleted());
assertEquals(BundleEntryTransactionMethodEnum.PUT, ResourceMetadataKeyEnum.ENTRY_TRANSACTION_METHOD.get((IResource) entries.get(0)));
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get((IResource) entries.get(1)));
assertTrue(entries.get(1).isDeleted());
assertEquals(BundleEntryTransactionMethodEnum.DELETE, ResourceMetadataKeyEnum.ENTRY_TRANSACTION_METHOD.get((IResource) entries.get(1)));
assertNull(ResourceMetadataKeyEnum.DELETED_AT.get((IResource) entries.get(2)));
assertFalse(entries.get(2).isDeleted());
assertEquals(BundleEntryTransactionMethodEnum.POST, ResourceMetadataKeyEnum.ENTRY_TRANSACTION_METHOD.get((IResource) entries.get(2)));
}

View File

@ -5,7 +5,6 @@ import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
import ca.uhn.fhir.jpa.provider.SystemProviderDstu2Test;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.api.IResource;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
import ca.uhn.fhir.model.api.TagList;
import ca.uhn.fhir.model.base.composite.BaseCodingDt;
@ -63,6 +62,7 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -739,10 +739,8 @@ public class FhirSystemDaoDstu2Test extends BaseJpaDstu2SystemTest {
IBundleProvider history = myPatientDao.history(id, null, null, null, mySrd);
assertEquals(2, history.size().intValue());
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get((IResource) history.getResources(0, 1).get(0)));
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get((IResource) history.getResources(0, 1).get(0)).getValue());
assertNull(ResourceMetadataKeyEnum.DELETED_AT.get((IResource) history.getResources(1, 2).get(0)));
assertTrue(history.getResources(0, 1).get(0).isDeleted());
assertFalse(history.getResources(1, 2).get(0).isDeleted());
}
@Test

View File

@ -1055,7 +1055,7 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
IBundleProvider history = myPatientDao.history(null, null, null, mySrd);
assertEquals(4 + initialHistory, history.size().intValue());
List<IBaseResource> resources = history.getResources(0, 4);
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get(resources.get(0)));
assertTrue(resources.get(0).isDeleted());
try {
myPatientDao.delete(id2, mySrd);
@ -1164,10 +1164,8 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
IBundleProvider history = myPatientDao.history(id, null, null, null, mySrd);
assertEquals(2, history.size().intValue());
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get(history.getResources(0, 1).get(0)));
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get(history.getResources(0, 1).get(0)).getValue());
assertNull(ResourceMetadataKeyEnum.DELETED_AT.get(history.getResources(1, 2).get(0)));
assertTrue(history.getResources(0, 1).get(0).isDeleted());
assertFalse(history.getResources(1, 2).get(0).isDeleted());
}
@Test
@ -1622,13 +1620,13 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
assertEquals(id.withVersion("2"), entries.get(1).getIdElement());
assertEquals(id.withVersion("1"), entries.get(2).getIdElement());
assertNull(ResourceMetadataKeyEnum.DELETED_AT.get(entries.get(0)));
assertFalse(entries.get(0).isDeleted());
assertEquals(BundleEntryTransactionMethodEnum.PUT, ResourceMetadataKeyEnum.ENTRY_TRANSACTION_METHOD.get(entries.get(0)));
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get(entries.get(1)));
assertTrue(entries.get(1).isDeleted());
assertEquals(BundleEntryTransactionMethodEnum.DELETE, ResourceMetadataKeyEnum.ENTRY_TRANSACTION_METHOD.get(entries.get(1)));
assertNull(ResourceMetadataKeyEnum.DELETED_AT.get(entries.get(2)));
assertFalse(entries.get(2).isDeleted());
assertEquals(BundleEntryTransactionMethodEnum.POST, ResourceMetadataKeyEnum.ENTRY_TRANSACTION_METHOD.get(entries.get(2)));
}

View File

@ -12,7 +12,6 @@ import ca.uhn.fhir.jpa.interceptor.CascadingDeleteInterceptor;
import ca.uhn.fhir.jpa.model.entity.ResourceTag;
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.parser.LenientErrorHandler;
import ca.uhn.fhir.rest.api.Constants;
@ -59,7 +58,6 @@ import org.hl7.fhir.dstu3.model.Quantity;
import org.hl7.fhir.dstu3.model.Reference;
import org.hl7.fhir.dstu3.model.Resource;
import org.hl7.fhir.dstu3.model.UriType;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -95,6 +93,7 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -1336,10 +1335,8 @@ public class FhirSystemDaoDstu3Test extends BaseJpaDstu3SystemTest {
IBundleProvider history = myPatientDao.history(id, null, null, null, mySrd);
assertEquals(2, history.size().intValue());
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get((IAnyResource) history.getResources(0, 1).get(0)));
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get((IAnyResource) history.getResources(0, 1).get(0)).getValue());
assertNull(ResourceMetadataKeyEnum.DELETED_AT.get((IAnyResource) history.getResources(1, 2).get(0)));
assertTrue(history.getResources(0, 1).get(0).isDeleted());
assertFalse(history.getResources(1, 2).get(0).isDeleted());
}
@Test

View File

@ -32,7 +32,6 @@ import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.api.SortOrderEnum;
import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.param.DateParam;
import ca.uhn.fhir.rest.param.DateRangeParam;
@ -50,7 +49,6 @@ import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.BundleBuilder;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
@ -64,7 +62,6 @@ import org.hibernate.search.mapper.orm.session.SearchSession;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Address;
import org.hl7.fhir.r4.model.Age;
import org.hl7.fhir.r4.model.Attachment;
import org.hl7.fhir.r4.model.Bundle;
@ -102,7 +99,6 @@ import org.hl7.fhir.r4.model.OperationOutcome.IssueType;
import org.hl7.fhir.r4.model.Organization;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Period;
import org.hl7.fhir.r4.model.Practitioner;
import org.hl7.fhir.r4.model.Provenance;
import org.hl7.fhir.r4.model.Quantity;
import org.hl7.fhir.r4.model.Quantity.QuantityComparator;
@ -1422,7 +1418,7 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
IBundleProvider history = myPatientDao.history(null, null, null, mySrd);
assertEquals(4 + initialHistory, history.size().intValue());
List<IBaseResource> resources = history.getResources(0, 4);
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get(resources.get(0)));
assertTrue(resources.get(0).isDeleted());
try {
myPatientDao.delete(id2, mySrd);
@ -1597,10 +1593,8 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
IBundleProvider history = myPatientDao.history(id, null, null, null, mySrd);
assertEquals(2, history.size().intValue());
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get(history.getResources(0, 1).get(0)));
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get(history.getResources(0, 1).get(0)).getValue());
assertNull(ResourceMetadataKeyEnum.DELETED_AT.get(history.getResources(1, 2).get(0)));
assertTrue(history.getResources(0, 1).get(0).isDeleted());
assertFalse(history.getResources(1, 2).get(0).isDeleted());
}
@Test
@ -2077,13 +2071,13 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
assertEquals(id.withVersion("2"), entries.get(1).getIdElement());
assertEquals(id.withVersion("1"), entries.get(2).getIdElement());
assertNull(ResourceMetadataKeyEnum.DELETED_AT.get(entries.get(0)));
assertFalse(entries.get(0).isDeleted());
assertEquals(BundleEntryTransactionMethodEnum.PUT, ResourceMetadataKeyEnum.ENTRY_TRANSACTION_METHOD.get(entries.get(0)));
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get(entries.get(1)));
assertTrue(entries.get(1).isDeleted());
assertEquals(BundleEntryTransactionMethodEnum.DELETE, ResourceMetadataKeyEnum.ENTRY_TRANSACTION_METHOD.get(entries.get(1)));
assertNull(ResourceMetadataKeyEnum.DELETED_AT.get(entries.get(2)));
assertFalse(entries.get(2).isDeleted());
assertEquals(BundleEntryTransactionMethodEnum.POST, ResourceMetadataKeyEnum.ENTRY_TRANSACTION_METHOD.get(entries.get(2)));
}

View File

@ -14,7 +14,6 @@ import ca.uhn.fhir.jpa.model.entity.ResourceTag;
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
import ca.uhn.fhir.jpa.provider.r4.SystemProviderR4Test;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
@ -39,7 +38,6 @@ import ca.uhn.fhir.util.BundleBuilder;
import ca.uhn.fhir.util.ClasspathUtil;
import org.apache.commons.io.IOUtils;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.AllergyIntolerance;
@ -122,6 +120,7 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -2536,10 +2535,8 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
IBundleProvider history = myPatientDao.history(id, null, null, null, mySrd);
assertEquals(2, history.size().intValue());
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get((IAnyResource) history.getResources(0, 1).get(0)));
assertNotNull(ResourceMetadataKeyEnum.DELETED_AT.get((IAnyResource) history.getResources(0, 1).get(0)).getValue());
assertNull(ResourceMetadataKeyEnum.DELETED_AT.get((IAnyResource) history.getResources(1, 2).get(0)));
assertTrue(history.getResources(0, 1).get(0).isDeleted());
assertFalse(history.getResources(1, 2).get(0).isDeleted());
}
@Test

View File

@ -86,13 +86,13 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
protected final PointcutLatch mySubscriptionTopicsCheckedLatch = new PointcutLatch(Pointcut.SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED);
protected final PointcutLatch mySubscriptionDeliveredLatch = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY);
@Override
@BeforeEach
protected void before() throws Exception {
super.before();
mySubscriptionTopicDao = myDaoRegistry.getResourceDao(SubscriptionTopic.class);
mySubscriptionTestUtil.registerRestHookInterceptor();
mySubscriptionTestUtil.registerSubscriptionLoggingInterceptor();
ourListenerRestServer.unregisterProvider(mySystemProvider);
ourListenerRestServer.registerProvider(ourTestSystemProvider);
@ -138,6 +138,8 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
myStorageSettings.setAllowMultipleDelete(new JpaStorageSettings().isAllowMultipleDelete());
mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
mySubscriptionTestUtil.unregisterSubscriptionLoggingInterceptor();
ourListenerRestServer.unregisterProvider(ourTestSystemProvider);
ourListenerRestServer.registerProvider(mySystemProvider);
mySubscriptionTopicsCheckedLatch.clear();

View File

@ -45,6 +45,7 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.matchesPattern;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -76,7 +77,6 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
createObservationSubscriptionTopic(OBS_CODE2);
waitForRegisteredSubscriptionTopicCount(2);
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription1 = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE, Constants.CT_FHIR_XML_NEW);
Subscription subscription = postSubscription(subscription1);
@ -94,7 +94,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
@NotNull
private Observation sendObservationExpectDelivery() throws InterruptedException {
return sendObservation(OBS_CODE, "SNOMED-CT", true);
return sendObservation(true);
}
@Test
@ -109,7 +109,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
*/
Observation sentObservation = sendObservationExpectDelivery();
sentObservation = myObservationDao.read(sentObservation.getIdElement().toUnqualifiedVersionless());
sentObservation = myObservationDao.read(sentObservation.getIdElement().toUnqualifiedVersionless(), mySrd);
// Should see 1 subscription notification
awaitUntilReceivedTransactionCount(1);
@ -129,7 +129,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
sentObservation.getIdentifierFirstRep().setSystem("foo").setValue("2");
updateResource(sentObservation, true);
sentObservation = myObservationDao.read(sentObservation.getIdElement().toUnqualifiedVersionless());
sentObservation = myObservationDao.read(sentObservation.getIdElement().toUnqualifiedVersionless(), mySrd);
// Should see a second subscription notification
awaitUntilReceivedTransactionCount(2);
@ -204,7 +204,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
Observation receivedObs = assertBundleAndGetObservation(subscription, sentObservation);
Observation obs = myObservationDao.read(new IdType(responseBundle.getEntry().get(0).getResponse().getLocation()));
Observation obs = myObservationDao.read(new IdType(responseBundle.getEntry().get(0).getResponse().getLocation()), mySrd);
Assertions.assertEquals(Constants.CT_FHIR_JSON_NEW, getLastSystemProviderContentType());
Assertions.assertEquals("1", receivedObs.getIdElement().getVersionIdPart());
@ -229,7 +229,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
awaitUntilReceivedTransactionCount(2);
receivedObs = assertBundleAndGetObservation(subscription, sentObservation);
obs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless());
obs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless(), mySrd);
Assertions.assertEquals(Constants.CT_FHIR_JSON_NEW, getLastSystemProviderContentType());
Assertions.assertEquals("2", receivedObs.getIdElement().getVersionIdPart());
@ -274,7 +274,6 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
@NotNull
private Subscription createTopicSubscription() throws InterruptedException {
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE, Constants.CT_FHIR_JSON_NEW);
return postSubscription(subscription);
@ -355,14 +354,14 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
idElement = obs2.getIdElement();
assertEquals(sentObservation2.getIdElement().getIdPart(), idElement.getIdPart());
// Now VersionId is stripped
assertEquals(null, idElement.getVersionIdPart());
assertNull(idElement.getVersionIdPart());
}
@Test
public void testRestHookSubscriptionDoesntGetLatestVersionByDefault() throws Exception {
createSubscriptionTopic();
Subscription subscription = createTopicSubscription();
createTopicSubscription();
waitForActivatedSubscriptionCount(1);
myStoppableSubscriptionDeliveringRestHookSubscriber.pause();
@ -370,7 +369,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(countDownLatch);
ourLog.info("** About to send observation");
Observation sentObservation = sendObservation(OBS_CODE, "SNOMED-CT", false);
Observation sentObservation = sendObservation(false);
assertEquals("1", sentObservation.getIdElement().getVersionIdPart());
assertNull(sentObservation.getNoteFirstRep().getText());
@ -419,7 +418,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(countDownLatch);
ourLog.info("** About to send observation");
Observation sentObservation = sendObservation(OBS_CODE, "SNOMED-CT", false);
Observation sentObservation = sendObservation(false);
assertEquals("1", sentObservation.getIdElement().getVersionIdPart());
assertNull(sentObservation.getNoteFirstRep().getText());
@ -445,11 +444,11 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
createObservationSubscriptionTopic(OBS_CODE2);
waitForRegisteredSubscriptionTopicCount(2);
// Subscribe to OBS_CODE topic
Subscription subscription1 = createTopicSubscription();
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE2, Constants.CT_FHIR_JSON_NEW);
Subscription subscription2 = postSubscription(subscription);
// Subscribe to OBS_CODE2 topic
Subscription subscription2 = postSubscription(newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE2, Constants.CT_FHIR_JSON_NEW));
waitForActivatedSubscriptionCount(2);
Observation sentObservation1 = sendObservationExpectDelivery();
@ -459,49 +458,55 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
Assertions.assertEquals("1", receivedObs.getIdElement().getVersionIdPart());
Subscription subscriptionTemp = myClient.read(Subscription.class, subscription2.getId());
// Update the OBS_CODE2 subscription to subscribe to OBS_CODE
Subscription subscriptionTemp = myClient.read().resource(Subscription.class).withId(subscription2.getId()).execute();
assertNotNull(subscriptionTemp);
subscriptionTemp.setTopic(subscription1.getTopic());
updateResource(subscriptionTemp, false);
Observation observation2 = sendObservationExpectDelivery();
// Should see two subscription notifications since both now point to OBS_CODE2
awaitUntilReceivedTransactionCount(3);
// Delete the second subscription
ourLog.info(">>> Deleting {}", subscription2.getId());
deleteSubscription(subscription2);
Observation observationTemp3 = sendObservationExpectDelivery();
IdType observationTemp3Id = sendObservationExpectDelivery().getIdElement().toUnqualifiedVersionless();
// Should see only one subscription notification
awaitUntilReceivedTransactionCount(4);
Observation observation3 = myClient.read(Observation.class, observationTemp3.getId());
CodeableConcept codeableConcept = new CodeableConcept();
observation3.setCode(codeableConcept);
Coding coding = codeableConcept.addCoding();
coding.setCode(OBS_CODE + "111");
coding.setSystem("SNOMED-CT");
// Now update the observation to have OBS_CODE2
Observation observation3 = myClient.read().resource(Observation.class).withId(observationTemp3Id).execute();
setCode(observation3, OBS_CODE2);
updateResource(observation3, true);
// Should see one subscription notification even though the new version doesn't match, the old version still does and our subscription topic
// is configured to match if either the old version matches or the new version matches
awaitUntilReceivedTransactionCount(5);
Observation observation3a = myClient.read(Observation.class, observationTemp3.getId());
Observation observation3a = myClient.read().resource(Observation.class).withId(observationTemp3Id).execute();
// Now update it back to OBS_CODE again
setCode(observation3a, OBS_CODE);
updateResource(observation3a, true);
// Should see exactly one subscription notification
awaitUntilReceivedTransactionCount(6);
assertNotEquals(subscription1.getId(), subscription2.getId());
assertFalse(sentObservation1.getId().isEmpty());
assertFalse(observation2.getId().isEmpty());
}
private static void setCode(Observation observation3a, String obsCode) {
CodeableConcept codeableConcept1 = new CodeableConcept();
observation3a.setCode(codeableConcept1);
Coding coding1 = codeableConcept1.addCoding();
coding1.setCode(OBS_CODE);
coding1.setCode(obsCode);
coding1.setSystem("SNOMED-CT");
updateResource(observation3a, true);
// Should see only one subscription notification
awaitUntilReceivedTransactionCount(6);
assertFalse(subscription1.getId().equals(subscription2.getId()));
assertFalse(sentObservation1.getId().isEmpty());
assertFalse(observation2.getId().isEmpty());
}
private void deleteSubscription(Subscription subscription2) throws InterruptedException {
@ -511,11 +516,18 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
}
private void awaitUntilReceivedTransactionCount(int theExpected) {
if (getSystemProviderCount() == theExpected) {
String list = getReceivedObservations().stream()
.map(t -> t.getIdElement().toUnqualifiedVersionless().getValue() + " " + t.getCode().getCodingFirstRep().getCode())
.collect(Collectors.joining(", "));
ourLog.info("Received {} transactions as expected: {}", theExpected, list);
} else {
String list = getReceivedObservations().stream()
.map(t -> t.getIdElement().toUnqualifiedVersionless().getValue() + " " + t.getCode().getCodingFirstRep().getCode())
.collect(Collectors.joining(", "));
String errorMessage = "Expected " + theExpected + " transactions, have " + getSystemProviderCount() + ": " + list;
await(errorMessage).until(() -> getSystemProviderCount() == theExpected);
String errorMessage = "Expected " + theExpected + " transactions, have " + getSystemProviderCount() + ": " + list;
await(errorMessage).until(() -> getSystemProviderCount() == theExpected);
}
}
@Test
@ -527,7 +539,6 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
@Nonnull
private Subscription createTopicSubscription(String theTopicUrlSuffix) throws InterruptedException {
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + theTopicUrlSuffix, Constants.CT_FHIR_JSON_NEW);
return postSubscription(subscription);
@ -537,7 +548,6 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
public void testSubscriptionTriggerViaSubscription() throws Exception {
createSubscriptionTopic();
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription1 = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE, Constants.CT_FHIR_XML_NEW);
Subscription subscription = postSubscription(subscription1);
@ -591,7 +601,6 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
ourLog.info("** About to create non-matching subscription");
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription1 = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE2, Constants.CT_FHIR_XML_NEW);
Subscription subscription = postSubscription(subscription1);
@ -599,24 +608,24 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
ourLog.info("** About to send observation that wont match");
Observation observation1 = sendObservation(OBS_CODE, "SNOMED-CT", false);
sendObservation(false);
awaitUntilReceivedTransactionCount(0);
ourLog.info("** About to update subscription topic");
SubscriptionTopic subscriptionTopicTemp = myClient.read(SubscriptionTopic.class, subscriptionTopic.getId());
SubscriptionTopic subscriptionTopicTemp = myClient.read().resource(SubscriptionTopic.class).withId(subscriptionTopic.getId()).execute();
assertNotNull(subscriptionTopicTemp);
setSubscriptionTopicCriteria(subscriptionTopicTemp, "Observation?code=SNOMED-CT|" + OBS_CODE);
updateResource(subscriptionTopicTemp, false);
ourLog.info("** About to send Observation 2");
Observation observation2 = sendObservationExpectDelivery();
sendObservationExpectDelivery();
// Should see a subscription notification this time
awaitUntilReceivedTransactionCount(1);
deleteSubscription(subscription);
Observation observationTemp3 = sendObservation(OBS_CODE, "SNOMED-CT", false);
sendObservation(false);
// No more matches
awaitUntilReceivedTransactionCount(1);
@ -632,17 +641,15 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
createObservationSubscriptionTopic(OBS_CODE2);
waitForRegisteredSubscriptionTopicCount(2);
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription3 = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE, Constants.CT_FHIR_XML_NEW);
Subscription subscription1 = postSubscription(subscription3);
// WIP STR5 will likely require matching TopicSubscription
postSubscription(subscription3);
Subscription subscription = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE2, Constants.CT_FHIR_XML_NEW);
Subscription subscription2 = postSubscription(subscription);
postSubscription(subscription);
waitForActivatedSubscriptionCount(2);
Observation observation1 = sendObservationExpectDelivery();
sendObservationExpectDelivery();
// Should see 1 subscription notification
awaitUntilReceivedTransactionCount(1);
@ -668,11 +675,10 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
}
}
@Nonnull
private SubscriptionTopic createSubscriptionTopicWithCriteria(String theCriteria) throws InterruptedException {
private void createSubscriptionTopicWithCriteria(String theCriteria) throws InterruptedException {
SubscriptionTopic subscriptionTopic = buildSubscriptionTopic(CUSTOM_URL);
setSubscriptionTopicCriteria(subscriptionTopic, theCriteria);
return createSubscriptionTopic(subscriptionTopic);
createSubscriptionTopic(subscriptionTopic);
}
@Test
@ -686,14 +692,14 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
// Should see 1 subscription notification
awaitUntilReceivedTransactionCount(1);
Observation receivedObservation = assertBundleAndGetObservation(subscription, sentObservation);
assertBundleAndGetObservation(subscription, sentObservation);
// Disable
subscription.setStatus(Enumerations.SubscriptionStatusCodes.OFF);
updateResource(subscription, false);
// Send another observation
sendObservation(OBS_CODE, "SNOMED-CT", false);
sendObservation(false);
// Should see no new delivery
awaitUntilReceivedTransactionCount(1);
@ -702,7 +708,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
@Test
public void testInvalidProvenanceParam() {
assertThrows(UnprocessableEntityException.class, () -> {
String criteriabad = "Provenance?foo=http://hl7.org/fhir/v3/DocumentCompletion%7CAU";
String criteriabad = "Provenance?foo=https://hl7.org/fhir/v3/DocumentCompletion%7CAU";
createSubscriptionTopicWithCriteria(criteriabad);
});
}
@ -728,7 +734,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
createSubscriptionTopic();
assertEquals(0, subscriptionCount());
Subscription subscription = createTopicSubscription();
createTopicSubscription();
waitForActivatedSubscriptionCount(1);
assertEquals(1, subscriptionCount());
}
@ -774,7 +780,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
sp.setType(Enumerations.SearchParamType.TOKEN);
sp.setExpression("Observation.extension('Observation#accessType')");
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
mySearchParameterDao.create(sp);
mySearchParameterDao.create(sp, mySrd);
mySearchParamRegistry.forceRefresh();
createSubscriptionTopicWithCriteria(criteria);
waitForRegisteredSubscriptionTopicCount(1);
@ -841,14 +847,15 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
}
private Observation sendObservation(String theCode, String theSystem, boolean theExpectDelivery) throws InterruptedException {
private Observation sendObservation(boolean theExpectDelivery) throws
InterruptedException {
Observation observation = new Observation();
CodeableConcept codeableConcept = new CodeableConcept();
observation.setCode(codeableConcept);
observation.getIdentifierFirstRep().setSystem("foo").setValue("1");
Coding coding = codeableConcept.addCoding();
coding.setCode(theCode);
coding.setSystem(theSystem);
coding.setCode(OBS_CODE);
coding.setSystem("SNOMED-CT");
observation.setStatus(Enumerations.ObservationStatus.FINAL);

View File

@ -42,5 +42,7 @@
-->
<!-- See https://docs.jboss.org/hibernate/stable/search/reference/en-US/html_single/#backend-lucene-io-writer-infostream for lucene logging
<logger name="org.hibernate.search.backend.lucene.infostream" level="TRACE"/> -->
<logger name="ca.uhn.test.concurrency.PointcutLatch" level="debug"/>
<logger name="ca.uhn.fhir.log.subscription_topic_troubleshooting" level="debug"/>
</configuration>

View File

@ -19,6 +19,7 @@
*/
package ca.uhn.fhir.jpa.test.util;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCacheRefresher;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
@ -30,12 +31,14 @@ import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.subscription.util.SubscriptionDebugLogInterceptor;
import org.hl7.fhir.dstu2.model.Subscription;
import org.hl7.fhir.instance.model.api.IIdType;
import org.springframework.beans.factory.annotation.Autowired;
public class SubscriptionTestUtil {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionTestUtil.class);
private static final SubscriptionDebugLogInterceptor ourSubscriptionDebugLogInterceptor = new SubscriptionDebugLogInterceptor();
@Autowired
private JpaStorageSettings myStorageSettings;
@ -49,6 +52,8 @@ public class SubscriptionTestUtil {
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
@Autowired
private IResourceChangeListenerCacheRefresher myResourceChangeListenerCacheRefresher;
@Autowired
private IInterceptorService myInterceptorRegistry;
public int getExecutorQueueSize() {
LinkedBlockingChannel channel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
@ -95,6 +100,14 @@ public class SubscriptionTestUtil {
mySubscriptionSubmitInterceptorLoader.unregisterInterceptorsForUnitTest();
}
// TODO KHS call in all subscription base tests
public void registerSubscriptionLoggingInterceptor() {
myInterceptorRegistry.registerInterceptor(ourSubscriptionDebugLogInterceptor);
}
public void unregisterSubscriptionLoggingInterceptor() {
myInterceptorRegistry.unregisterInterceptor(ourSubscriptionDebugLogInterceptor);
}
public int getExecutorQueueSizeForUnitTests() {
return getExecutorQueueSize();
}

View File

@ -65,7 +65,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@ -266,7 +275,7 @@ public class HashMapResourceProvider<T extends IBaseResource> implements IResour
retVal = versions.lastEntry().getValue();
}
if (retVal == null || ResourceMetadataKeyEnum.DELETED_AT.get(retVal) != null) {
if (retVal == null || retVal.isDeleted()) {
throw new ResourceGoneException(Msg.code(2244) + theId);
}
@ -330,7 +339,7 @@ public class HashMapResourceProvider<T extends IBaseResource> implements IResour
if (next.isEmpty() == false) {
T nextResource = next.lastEntry().getValue();
if (nextResource != null) {
if (ResourceMetadataKeyEnum.DELETED_AT.get(nextResource) == null) {
if (!nextResource.isDeleted()) {
// Clone the resource for search results so that the
// stored metadata doesn't appear in the results
T nextResourceClone = myFhirContext.newTerser().clone(nextResource);

View File

@ -49,7 +49,6 @@ import ca.uhn.fhir.jpa.searchparam.extractor.ResourceIndexedSearchParams;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryResourceMatcher;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.model.api.IResource;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.parser.IParser;
@ -89,7 +88,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.dstu3.model.Bundle;
import org.hl7.fhir.exceptions.FHIRException;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IBaseBundle;
@ -1518,12 +1516,7 @@ public abstract class BaseTransactionProcessor {
}
}
IPrimitiveType<Date> deletedInstantOrNull;
if (theResource instanceof IAnyResource) {
deletedInstantOrNull = ResourceMetadataKeyEnum.DELETED_AT.get((IAnyResource) theResource);
} else {
deletedInstantOrNull = ResourceMetadataKeyEnum.DELETED_AT.get((IResource) theResource);
}
IPrimitiveType<Date> deletedInstantOrNull = ResourceMetadataKeyEnum.DELETED_AT.get(theResource);
Date deletedTimestampOrNull = deletedInstantOrNull != null ? deletedInstantOrNull.getValue() : null;
IFhirResourceDao<? extends IBaseResource> dao = myDaoRegistry.getResourceDao(theResource.getClass());

View File

@ -234,7 +234,7 @@ public class HapiTransactionService implements IHapiTransactionService {
ExceptionUtils.indexOfThrowable(e, DataIntegrityViolationException.class) != -1 ||
ExceptionUtils.indexOfThrowable(e, ConstraintViolationException.class) != -1 ||
ExceptionUtils.indexOfThrowable(e, ObjectOptimisticLockingFailureException.class) != -1)) {
ourLog.error("Unexpected transaction exception. Will not be retried.", e);
ourLog.debug("Unexpected transaction exception. Will not be retried.", e);
throw e;
} else {

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.IdType;
@ -338,19 +339,24 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
@Override
public String toString() {
return new ToStringBuilder(this)
.append("myIdElement", myIdElement)
.append("myStatus", myStatus)
.append("myCriteriaString", myCriteriaString)
.append("myEndpointUrl", myEndpointUrl)
.append("myPayloadString", myPayloadString)
ToStringBuilder stringBuilder = new ToStringBuilder(this, ToStringStyle.MULTI_LINE_STYLE)
.append("idElement", myIdElement);
// .append("status", myStatus)
// .append("endpointUrl", myEndpointUrl)
// .append("payloadString", myPayloadString)
// .append("myHeaders", myHeaders)
.append("myChannelType", myChannelType)
// .append("channelType", myChannelType);
// .append("myTrigger", myTrigger)
// .append("myEmailDetails", myEmailDetails)
// .append("myRestHookDetails", myRestHookDetails)
// .append("myChannelExtensions", myChannelExtensions)
.toString();
if (isTopicSubscription()) {
stringBuilder.append("topic", myTopicSubscription.getTopic());
} else {
stringBuilder.append("criteriaString", myCriteriaString);
}
return stringBuilder.toString();
}
public void setTopicSubscription(boolean theTopicSubscription) {

View File

@ -172,6 +172,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
@Override
public void clear() {
ourLog.debug("Clearing latch {}", getName());
myCountdownLatch.set(null);
myCountdownLatchSetStacktrace.set(null);
}