Allow Subscription to be configured to send delete messages (#3245)

* Allow Subscription to be configured to send delete messages

* Add tests and implement MR suggestions

Co-authored-by: juan.marchionatto <juan.marchionatto@smilecdr.com>
This commit is contained in:
jmarchionatto 2021-12-14 16:32:56 -05:00 committed by GitHub
parent 50a3005c23
commit ef83777115
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 230 additions and 11 deletions

View File

@ -130,6 +130,11 @@ public class HapiExtensions {
*/
public static final String EX_RETRY_COUNT = "http://hapifhir.io/fhir/StructureDefinition/subscription-delivery-retry-count";
/**
* This extension provides a way for subscribers to indicate if DELETE messages must be sent (default is ignoring them)
*/
public static final String EX_SEND_DELETE_MESSAGES = "http://hapifhir.io/fhir/StructureDefinition/subscription-send-delete-messages";
/**
* Non instantiable
*/

View File

@ -0,0 +1,4 @@
---
type: add
issue: 3243
title: "Allow Rest Hook subscriptions to be configured to send delete requests."

View File

@ -17,6 +17,7 @@ import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.CodeableConcept;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Organization;
import org.hl7.fhir.r4.model.Patient;
@ -109,7 +110,14 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
protected Subscription createSubscription(String theCriteria, String thePayload) {
return createSubscription(theCriteria, thePayload, null);
}
protected Subscription createSubscription(String theCriteria, String thePayload, Extension theExtension) {
Subscription subscription = newSubscription(theCriteria, thePayload);
if (theExtension != null) {
subscription.getExtension().add(theExtension);
}
MethodOutcome methodOutcome = myClient.create().resource(subscription).execute();
subscription.setId(methodOutcome.getId().getIdPart());

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.config.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome;
@ -15,6 +16,7 @@ import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.CodeableConcept;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Meta;
import org.hl7.fhir.r4.model.Observation;
@ -32,6 +34,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static ca.uhn.fhir.util.HapiExtensions.EX_SEND_DELETE_MESSAGES;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@ -514,6 +517,39 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
assertEquals("changed", observation2.getNoteFirstRep().getText());
}
@Test
public void RestHookSubscriptionWithPayloadSendsDeleteRequest() throws Exception {
String payload = "application/json";
String criteria1 = "[*]";
Extension sendDeleteMessagesExtension = new Extension()
.setUrl(EX_SEND_DELETE_MESSAGES)
.setValue(new BooleanType(true));
waitForActivatedSubscriptionCount(0);
createSubscription(criteria1, payload, sendDeleteMessagesExtension);
waitForActivatedSubscriptionCount(1);
myStoppableSubscriptionDeliveringRestHookSubscriber.pause();
final CountDownLatch countDownLatch = new CountDownLatch(1);
myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(countDownLatch);
ourLog.info("** About to send observation");
Observation observation = sendObservation("OB-01", "SNOMED-CT");
assertEquals("1", observation.getIdElement().getVersionIdPart());
// Wait for our delivery channel thread to be paused
assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
// Open the floodgates!
myStoppableSubscriptionDeliveringRestHookSubscriber.unPause();
ourLog.info("** About to delete observation");
myObservationDao.delete(IdDt.of(observation).toUnqualifiedVersionless());
ourObservationProvider.waitForDeleteCount(1);
}
@Test
public void testRestHookSubscriptionGetsLatestVersionWithFlag() throws Exception {
String payload = "application/json";

View File

@ -45,6 +45,7 @@ import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor;
import ca.uhn.fhir.rest.gclient.IClientExecutable;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
import ca.uhn.fhir.util.BundleBuilder;
import org.apache.commons.text.StringSubstitutor;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -164,11 +165,11 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
return operation;
}
public IBaseResource getResource(IIdType payloadId, RequestPartitionId thePartitionId) throws ResourceGoneException {
public IBaseResource getResource(IIdType payloadId, RequestPartitionId thePartitionId, boolean theDeletedOK) throws ResourceGoneException {
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(payloadId.getResourceType());
SystemRequestDetails systemRequestDetails = new SystemRequestDetails().setRequestPartitionId(thePartitionId);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceDef.getImplementingClass());
return dao.read(payloadId.toVersionless(), systemRequestDetails);
return dao.read(payloadId.toVersionless(), systemRequestDetails, theDeletedOK);
}
@ -180,7 +181,8 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
try {
if (payloadId != null) {
payloadResource = getResource(payloadId.toVersionless(), theMsg.getRequestPartitionId());
boolean deletedOK = theMsg.getOperationType() == BaseResourceModifiedMessage.OperationTypeEnum.DELETE;
payloadResource = getResource(payloadId.toVersionless(), theMsg.getRequestPartitionId(), deletedOK);
} else {
return null;
}

View File

@ -28,6 +28,7 @@ import org.springframework.messaging.MessagingException;
import javax.annotation.Nonnull;
import java.util.Collection;
import static ca.uhn.fhir.rest.server.messaging.BaseResourceMessage.OperationTypeEnum.DELETE;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -91,8 +92,8 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
case CREATE:
case UPDATE:
case MANUALLY_TRIGGERED:
break;
case DELETE:
break;
default:
ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
// ignore anything else
@ -143,6 +144,13 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
continue;
}
if (theMsg.getOperationType().equals(DELETE)) {
if (! nextActiveSubscription.getSubscription().getSendDeleteMessages()) {
ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
return;
}
}
InMemoryMatchResult matchResult;
if (nextActiveSubscription.getCriteria().getType() == SubscriptionCriteriaParser.TypeEnum.SEARCH_EXPRESSION) {
matchResult = mySubscriptionMatcher.match(nextActiveSubscription.getSubscription(), theMsg);
@ -169,7 +177,11 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPartitionId(theMsg.getPartitionId());
if (payload != null) {
deliveryMsg.setPayload(myFhirContext, payload, encoding);
} else {
deliveryMsg.setPayloadId(theMsg.getPayloadId(myFhirContext));
}
deliveryMsg.setSubscription(subscription);
deliveryMsg.setOperationType(theMsg.getOperationType());
deliveryMsg.setTransactionId(theMsg.getTransactionId());

View File

@ -1,24 +1,43 @@
package ca.uhn.fhir.jpa.subscription.module.subscriber;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscribableChannelDstu3Test;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
import com.google.common.collect.Lists;
import org.hl7.fhir.dstu3.model.Observation;
import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.Collections;
import java.util.List;
import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser.TypeEnum.STARTYPE_EXPRESSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests copied from jpa.subscription.resthook.RestHookTestDstu3Test
@ -29,7 +48,7 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
@BeforeEach
public void beforeEach() {
Mockito.when(myMockSubscriptionDao.getResourceType()).thenReturn(Subscription.class);
when(myMockSubscriptionDao.getResourceType()).thenReturn(Subscription.class);
myDaoRegistry.register(myMockSubscriptionDao);
}
@ -264,11 +283,82 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mySubscriptionResourceNotMatched.awaitExpected();
}
@Nested
public class TestDeleteMessages {
private final SubscriptionMatchingSubscriber subscriber = new SubscriptionMatchingSubscriber();
@Mock ResourceModifiedMessage message;
@Mock IInterceptorBroadcaster myInterceptorBroadcaster;
@Mock SubscriptionRegistry mySubscriptionRegistry;
@Mock(answer = Answers.RETURNS_DEEP_STUBS) ActiveSubscription myActiveSubscription;
@Mock CanonicalSubscription myCanonicalSubscription;
@Mock SubscriptionCriteriaParser.SubscriptionCriteria mySubscriptionCriteria;
@Test
public void testAreNotIgnored() {
ReflectionTestUtils.setField(subscriber, "myInterceptorBroadcaster", myInterceptorBroadcaster);
ReflectionTestUtils.setField(subscriber, "mySubscriptionRegistry", mySubscriptionRegistry);
when(message.getOperationType()).thenReturn(BaseResourceModifiedMessage.OperationTypeEnum.DELETE);
when(myInterceptorBroadcaster.callHooks(
eq(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED), any(HookParams.class))).thenReturn(true);
when(mySubscriptionRegistry.getAll()).thenReturn(Collections.emptyList());
subscriber.matchActiveSubscriptionsAndDeliver(message);
verify(myInterceptorBroadcaster).callHooks(
eq(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED), any(HookParams.class));
verify(myInterceptorBroadcaster).callHooks(
eq(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED), any(HookParams.class));
}
@Test
public void matchActiveSubscriptionsChecksSendDeleteMessagesExtensionFlag() {
ReflectionTestUtils.setField(subscriber, "myInterceptorBroadcaster", myInterceptorBroadcaster);
ReflectionTestUtils.setField(subscriber, "mySubscriptionRegistry", mySubscriptionRegistry);
when(message.getOperationType()).thenReturn(BaseResourceModifiedMessage.OperationTypeEnum.DELETE);
when(myInterceptorBroadcaster.callHooks(
eq(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED), any(HookParams.class))).thenReturn(true);
when(message.getPayloadId(null)).thenReturn(new IdDt("Patient", 123L));
when(mySubscriptionRegistry.getAll()).thenReturn(Collections.singletonList(myActiveSubscription));
when(myActiveSubscription.getSubscription()).thenReturn(myCanonicalSubscription);
when(myActiveSubscription.getCriteria()).thenReturn(mySubscriptionCriteria);
when(myActiveSubscription.getId()).thenReturn("Patient/123");
when(mySubscriptionCriteria.getType()).thenReturn(STARTYPE_EXPRESSION);
subscriber.matchActiveSubscriptionsAndDeliver(message);
verify(myCanonicalSubscription, atLeastOnce()).getSendDeleteMessages();
}
@Test
public void matchActiveSubscriptionsAndDeliverSetsPartitionId() {
ReflectionTestUtils.setField(subscriber, "myInterceptorBroadcaster", myInterceptorBroadcaster);
ReflectionTestUtils.setField(subscriber, "mySubscriptionRegistry", mySubscriptionRegistry);
when(message.getOperationType()).thenReturn(BaseResourceModifiedMessage.OperationTypeEnum.DELETE);
when(myInterceptorBroadcaster.callHooks(
eq(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED), any(HookParams.class))).thenReturn(true);
when(message.getPayloadId(null)).thenReturn(new IdDt("Patient", 123L));
when(mySubscriptionRegistry.getAll()).thenReturn(Collections.singletonList(myActiveSubscription));
when(myActiveSubscription.getSubscription()).thenReturn(myCanonicalSubscription);
when(myActiveSubscription.getCriteria()).thenReturn(mySubscriptionCriteria);
when(myActiveSubscription.getId()).thenReturn("Patient/123");
when(mySubscriptionCriteria.getType()).thenReturn(STARTYPE_EXPRESSION);
when(myCanonicalSubscription.getSendDeleteMessages()).thenReturn(true);
subscriber.matchActiveSubscriptionsAndDeliver(message);
verify(message, atLeastOnce()).getPayloadId(null);
}
}
private void mockSubscriptionRead(RequestPartitionId theRequestPartitionId, Subscription subscription) {
Subscription modifiedSubscription = subscription.copy();
// the original partition info was the request info, but we need the actual storage partition.
modifiedSubscription.setUserData(Constants.RESOURCE_PARTITION_ID, theRequestPartitionId);
Mockito.when(myMockSubscriptionDao.read(eq(subscription.getIdElement()), any())).thenReturn(modifiedSubscription);
when(myMockSubscriptionDao.read(eq(subscription.getIdElement()), any())).thenReturn(modifiedSubscription);
}
}

View File

@ -38,6 +38,7 @@ import org.hl7.fhir.instance.model.api.IBaseMetaType;
import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Extension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -50,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static ca.uhn.fhir.util.HapiExtensions.EX_SEND_DELETE_MESSAGES;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
@ -268,6 +270,10 @@ public class SubscriptionCanonicalizer {
}
}
Extension extension = subscription.getExtensionByUrl(EX_SEND_DELETE_MESSAGES);
if (extension != null && extension.hasValue() && extension.getValue() instanceof BooleanType) {
retVal.setSendDeleteMessages(((BooleanType) extension.getValue()).booleanValue());
}
return retVal;
}

View File

@ -43,6 +43,7 @@ import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class CanonicalSubscription implements Serializable, Cloneable, IModelJson {
private static final long serialVersionUID = 1L;
@JsonProperty("id")
@ -73,6 +74,8 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
private String myPayloadSearchCriteria;
@JsonProperty("partitionId")
private Integer myPartitionId;
@JsonProperty("sendDeleteMessages")
private boolean mySendDeleteMessages;
/**
* Constructor
@ -90,7 +93,7 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
}
/**
* For now, we're using the R4 TriggerDefinition, but this
* For now we're using the R4 TriggerDefinition, but this
* may change in the future when things stabilize
*/
public void addTrigger(CanonicalEventDefinition theTrigger) {
@ -237,13 +240,19 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
}
/**
* For now, we're using the R4 triggerdefinition, but this
* For now we're using the R4 triggerdefinition, but this
* may change in the future when things stabilize
*/
public CanonicalEventDefinition getTrigger() {
return myTrigger;
}
public boolean getSendDeleteMessages() { return mySendDeleteMessages; }
public void setSendDeleteMessages(boolean theSendDeleteMessages) {
mySendDeleteMessages = theSendDeleteMessages;
}
@Override
public boolean equals(Object theO) {
if (this == theO) return true;

View File

@ -0,0 +1,42 @@
package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.Test;
import static ca.uhn.fhir.util.HapiExtensions.EX_SEND_DELETE_MESSAGES;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class SubscriptionCanonicalizerTest {
FhirContext r4Context = FhirContext.forR4();
private final SubscriptionCanonicalizer testedSC = new SubscriptionCanonicalizer(r4Context);
@Test
void testCanonicalizeR4SendDeleteMessagesSetsExtensionValueNotPresent() {
Subscription subscription = new Subscription();
CanonicalSubscription canonicalSubscription = testedSC.canonicalize(subscription);
assertFalse(canonicalSubscription.getSendDeleteMessages());
}
@Test
void testCanonicalizeR4SendDeleteMessagesSetsExtensionValue() {
Subscription subscription = new Subscription();
Extension sendDeleteMessagesExtension = new Extension()
.setUrl(EX_SEND_DELETE_MESSAGES)
.setValue(new BooleanType(true));
subscription.getExtension().add(sendDeleteMessagesExtension);
CanonicalSubscription canonicalSubscription = testedSC.canonicalize(subscription);
assertTrue(canonicalSubscription.getSendDeleteMessages());
}
}

View File

@ -89,12 +89,17 @@ public class HashMapResourceProviderExtension<T extends IBaseResource> extends H
public void waitForUpdateCount(long theCount) {
assertThat(theCount, greaterThanOrEqualTo(getCountUpdate()));
await().until(()->getCountUpdate(), equalTo(theCount));
await().until(this::getCountUpdate, equalTo(theCount));
}
public void waitForCreateCount(long theCount) {
assertThat(theCount, greaterThanOrEqualTo(getCountCreate()));
await().until(()->getCountCreate(), equalTo(theCount));
await().until(this::getCountCreate, equalTo(theCount));
}
public void waitForDeleteCount(long theCount) {
assertThat(theCount, greaterThanOrEqualTo(getCountDelete()));
await().until(this::getCountDelete, equalTo(theCount));
}
public List<T> getResourceUpdates() {