Resolve We don't have guaranteed subscription delivery if a resource is too large (#5414)
* first fix * - added the ability to handle null payload to SubscriptionDeliveringMessageSubscriber and SubscriptionDeliveringEmailSubscriber - refactored code to reduce repeated code - cleaned unnecessary comments and reformatted files * Changed myResourceModifiedMessagePersistenceSvc to be autowired * removed unused import * added error handling when inflating the message to email and message subscriber * reformatted code * Fixing subscription tests with mocked IResourceModifiedMessagePersistenceSvc * Changes by gary * Reformatted file * fixed failed tests * implemented test for message and email delivery subscriber. Fixed logical error. Reformatted File. * - implemented IT - fixed logical error - added changelog * fix for cdr tests, NOTE: this makes the assumption that we will always succeed for inflating the database in the tests that uses SynchronousSubscriptionMatcherInterceptor * fix for cdr tests, NOTE: this makes the assumption that we will always succeed for inflating the database in the tests that uses SynchronousSubscriptionMatcherInterceptor * resolve code review comments * reformatted files * fixed tests
This commit is contained in:
parent
7a25bfe847
commit
0831e0f05e
|
@ -0,0 +1,7 @@
|
|||
---
|
||||
type: add
|
||||
issue: 5407
|
||||
title: "Previously, when the payload of a subscription message exceeds the broker maximum message size, exception would
|
||||
be thrown and retry will be performed indefinitely until the maximum message size is adjusted. Now, the message will be
|
||||
successfully delivered for rest-hook and email subscriptions, while message subscriptions remains the same behavior as
|
||||
before."
|
|
@ -35,6 +35,7 @@ import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedSubmitterSvc;
|
|||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||
import ca.uhn.fhir.model.primitive.IdDt;
|
||||
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
||||
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK.with;
|
||||
|
||||
|
@ -92,9 +94,43 @@ public class ResourceModifiedMessagePersistenceSvcImpl implements IResourceModif
|
|||
|
||||
@Override
|
||||
public ResourceModifiedMessage inflatePersistedResourceModifiedMessage(
|
||||
IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
|
||||
ResourceModifiedMessage theResourceModifiedMessage) {
|
||||
|
||||
return inflateResourceModifiedMessageFromEntity((ResourceModifiedEntity) thePersistedResourceModifiedMessage);
|
||||
return inflateResourceModifiedMessageFromEntity(createEntityFrom(theResourceModifiedMessage));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ResourceModifiedMessage> inflatePersistedResourceModifiedMessageOrNull(
|
||||
ResourceModifiedMessage theResourceModifiedMessage) {
|
||||
ResourceModifiedMessage inflatedResourceModifiedMessage = null;
|
||||
|
||||
try {
|
||||
inflatedResourceModifiedMessage = inflatePersistedResourceModifiedMessage(theResourceModifiedMessage);
|
||||
} catch (ResourceNotFoundException e) {
|
||||
IdDt idDt = new IdDt(
|
||||
theResourceModifiedMessage.getPayloadType(myFhirContext),
|
||||
theResourceModifiedMessage.getPayloadId(),
|
||||
theResourceModifiedMessage.getPayloadVersion());
|
||||
|
||||
ourLog.warn("Scheduled submission will be ignored since resource {} cannot be found", idDt.getIdPart(), e);
|
||||
} catch (Exception ex) {
|
||||
ourLog.error("Unknown error encountered on inflation of resources.", ex);
|
||||
}
|
||||
|
||||
return Optional.ofNullable(inflatedResourceModifiedMessage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceModifiedMessage createResourceModifiedMessageFromEntityWithoutInflation(
|
||||
IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
|
||||
ResourceModifiedMessage resourceModifiedMessage = getPayloadLessMessageFromString(
|
||||
((ResourceModifiedEntity) thePersistedResourceModifiedMessage).getSummaryResourceModifiedMessage());
|
||||
|
||||
IdDt resourceId =
|
||||
createIdDtFromResourceModifiedEntity((ResourceModifiedEntity) thePersistedResourceModifiedMessage);
|
||||
resourceModifiedMessage.setPayloadId(resourceId);
|
||||
|
||||
return resourceModifiedMessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -112,17 +148,13 @@ public class ResourceModifiedMessagePersistenceSvcImpl implements IResourceModif
|
|||
|
||||
protected ResourceModifiedMessage inflateResourceModifiedMessageFromEntity(
|
||||
ResourceModifiedEntity theResourceModifiedEntity) {
|
||||
String resourcePid =
|
||||
theResourceModifiedEntity.getResourceModifiedEntityPK().getResourcePid();
|
||||
String resourceVersion =
|
||||
theResourceModifiedEntity.getResourceModifiedEntityPK().getResourceVersion();
|
||||
String resourceType = theResourceModifiedEntity.getResourceType();
|
||||
ResourceModifiedMessage retVal =
|
||||
getPayloadLessMessageFromString(theResourceModifiedEntity.getSummaryResourceModifiedMessage());
|
||||
SystemRequestDetails systemRequestDetails =
|
||||
new SystemRequestDetails().setRequestPartitionId(retVal.getPartitionId());
|
||||
|
||||
IdDt resourceIdDt = new IdDt(resourceType, resourcePid, resourceVersion);
|
||||
IdDt resourceIdDt = createIdDtFromResourceModifiedEntity(theResourceModifiedEntity);
|
||||
IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType);
|
||||
|
||||
IBaseResource iBaseResource = dao.read(resourceIdDt, systemRequestDetails, true);
|
||||
|
@ -164,6 +196,16 @@ public class ResourceModifiedMessagePersistenceSvcImpl implements IResourceModif
|
|||
}
|
||||
}
|
||||
|
||||
private IdDt createIdDtFromResourceModifiedEntity(ResourceModifiedEntity theResourceModifiedEntity) {
|
||||
String resourcePid =
|
||||
theResourceModifiedEntity.getResourceModifiedEntityPK().getResourcePid();
|
||||
String resourceVersion =
|
||||
theResourceModifiedEntity.getResourceModifiedEntityPK().getResourceVersion();
|
||||
String resourceType = theResourceModifiedEntity.getResourceType();
|
||||
|
||||
return new IdDt(resourceType, resourcePid, resourceVersion);
|
||||
}
|
||||
|
||||
private static class PayloadLessResourceModifiedMessage extends ResourceModifiedMessage {
|
||||
|
||||
public PayloadLessResourceModifiedMessage(ResourceModifiedMessage theMsg) {
|
||||
|
|
|
@ -33,7 +33,9 @@ import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
|
|||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
||||
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
|
||||
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
|
||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
|
||||
import ca.uhn.fhir.util.BundleBuilder;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
|
@ -48,6 +50,7 @@ import org.springframework.messaging.MessagingException;
|
|||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static ca.uhn.fhir.jpa.subscription.util.SubscriptionUtil.createRequestDetailForPartitionedRequest;
|
||||
|
||||
|
@ -60,6 +63,9 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl
|
|||
@Autowired
|
||||
protected SubscriptionRegistry mySubscriptionRegistry;
|
||||
|
||||
@Autowired
|
||||
protected IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
|
||||
|
||||
@Autowired
|
||||
private IInterceptorBroadcaster myInterceptorBroadcaster;
|
||||
|
||||
|
@ -149,6 +155,13 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl
|
|||
return builder.getBundle();
|
||||
}
|
||||
|
||||
protected Optional<ResourceModifiedMessage> inflateResourceModifiedMessageFromDeliveryMessage(
|
||||
ResourceDeliveryMessage theMsg) {
|
||||
ResourceModifiedMessage payloadLess =
|
||||
new ResourceModifiedMessage(theMsg.getPayloadId(myFhirContext), theMsg.getOperationType());
|
||||
return myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(payloadLess);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setFhirContextForUnitTest(FhirContext theCtx) {
|
||||
myFhirContext = theCtx;
|
||||
|
@ -174,6 +187,12 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl
|
|||
myMatchUrlService = theMatchUrlService;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setResourceModifiedMessagePersistenceSvcForUnitTest(
|
||||
IResourceModifiedMessagePersistenceSvc theResourceModifiedMessagePersistenceSvc) {
|
||||
myResourceModifiedMessagePersistenceSvc = theResourceModifiedMessagePersistenceSvc;
|
||||
}
|
||||
|
||||
public IInterceptorBroadcaster getInterceptorBroadcaster() {
|
||||
return myInterceptorBroadcaster;
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.model.entity.StorageSettings;
|
|||
import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber;
|
||||
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
|
||||
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
|
||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||
import ca.uhn.fhir.rest.api.EncodingEnum;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
@ -33,6 +34,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.defaultString;
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
|
@ -73,7 +75,7 @@ public class SubscriptionDeliveringEmailSubscriber extends BaseSubscriptionDeliv
|
|||
if (isNotBlank(subscription.getPayloadString())) {
|
||||
EncodingEnum encoding = EncodingEnum.forContentType(subscription.getPayloadString());
|
||||
if (encoding != null) {
|
||||
payload = theMessage.getPayloadString();
|
||||
payload = getPayloadStringFromMessageOrEmptyString(theMessage);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -112,4 +114,24 @@ public class SubscriptionDeliveringEmailSubscriber extends BaseSubscriptionDeliv
|
|||
public IEmailSender getEmailSender() {
|
||||
return myEmailSender;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the payload string, fetch it from the DB when the payload is null.
|
||||
*/
|
||||
private String getPayloadStringFromMessageOrEmptyString(ResourceDeliveryMessage theMessage) {
|
||||
String payload = theMessage.getPayloadString();
|
||||
|
||||
if (theMessage.getPayload(myCtx) != null) {
|
||||
return payload;
|
||||
}
|
||||
|
||||
Optional<ResourceModifiedMessage> inflatedMessage =
|
||||
inflateResourceModifiedMessageFromDeliveryMessage(theMessage);
|
||||
if (inflatedMessage.isEmpty()) {
|
||||
return "";
|
||||
}
|
||||
|
||||
payload = inflatedMessage.get().getPayloadString();
|
||||
return payload;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.springframework.messaging.MessagingException;
|
|||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
|
||||
|
@ -66,7 +67,7 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
|
|||
IBaseResource payloadResource = createDeliveryBundleForPayloadSearchCriteria(
|
||||
theSubscription, theWrappedMessageToSend.getPayload().getPayload(myFhirContext));
|
||||
ResourceModifiedJsonMessage newWrappedMessageToSend =
|
||||
convertDeliveryMessageToResourceModifiedMessage(theSourceMessage, payloadResource);
|
||||
convertDeliveryMessageToResourceModifiedJsonMessage(theSourceMessage, payloadResource);
|
||||
theWrappedMessageToSend.setPayload(newWrappedMessageToSend.getPayload());
|
||||
payloadId =
|
||||
payloadResource.getIdElement().toUnqualifiedVersionless().getValue();
|
||||
|
@ -82,7 +83,7 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
|
|||
.getValue());
|
||||
}
|
||||
|
||||
private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedMessage(
|
||||
private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedJsonMessage(
|
||||
ResourceDeliveryMessage theMsg, IBaseResource thePayloadResource) {
|
||||
ResourceModifiedMessage payload =
|
||||
new ResourceModifiedMessage(myFhirContext, thePayloadResource, theMsg.getOperationType());
|
||||
|
@ -96,8 +97,17 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
|
|||
public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException, URISyntaxException {
|
||||
CanonicalSubscription subscription = theMessage.getSubscription();
|
||||
IBaseResource payloadResource = theMessage.getPayload(myFhirContext);
|
||||
if (payloadResource == null) {
|
||||
Optional<ResourceModifiedMessage> inflatedMsg =
|
||||
inflateResourceModifiedMessageFromDeliveryMessage(theMessage);
|
||||
if (inflatedMsg.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
payloadResource = inflatedMsg.get().getPayload(myFhirContext);
|
||||
}
|
||||
|
||||
ResourceModifiedJsonMessage messageWrapperToSend =
|
||||
convertDeliveryMessageToResourceModifiedMessage(theMessage, payloadResource);
|
||||
convertDeliveryMessageToResourceModifiedJsonMessage(theMessage, payloadResource);
|
||||
|
||||
// Interceptor call: SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY
|
||||
HookParams params = new HookParams()
|
||||
|
|
|
@ -31,6 +31,7 @@ import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
|||
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
||||
import ca.uhn.fhir.subscription.SubscriptionConstants;
|
||||
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
|
||||
import ca.uhn.fhir.util.SubscriptionUtil;
|
||||
import org.hl7.fhir.dstu2.model.Subscription;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
|
@ -41,6 +42,7 @@ import org.springframework.messaging.Message;
|
|||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
|
||||
import java.util.Optional;
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
/**
|
||||
|
@ -64,6 +66,8 @@ public class SubscriptionActivatingSubscriber implements MessageHandler {
|
|||
@Autowired
|
||||
private StorageSettings myStorageSettings;
|
||||
|
||||
@Autowired
|
||||
private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
|
@ -86,6 +90,16 @@ public class SubscriptionActivatingSubscriber implements MessageHandler {
|
|||
switch (payload.getOperationType()) {
|
||||
case CREATE:
|
||||
case UPDATE:
|
||||
if (payload.getPayload(myFhirContext) == null) {
|
||||
Optional<ResourceModifiedMessage> inflatedMsg =
|
||||
myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(
|
||||
payload);
|
||||
if (inflatedMsg.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
payload = inflatedMsg.get();
|
||||
}
|
||||
|
||||
activateSubscriptionIfRequired(payload.getNewPayload(myFhirContext));
|
||||
break;
|
||||
case TRANSACTION:
|
||||
|
@ -104,7 +118,7 @@ public class SubscriptionActivatingSubscriber implements MessageHandler {
|
|||
*/
|
||||
public synchronized boolean activateSubscriptionIfRequired(final IBaseResource theSubscription) {
|
||||
// Grab the value for "Subscription.channel.type" so we can see if this
|
||||
// subscriber applies..
|
||||
// subscriber applies.
|
||||
CanonicalSubscriptionChannelType subscriptionChannelType =
|
||||
mySubscriptionCanonicalizer.getChannelType(theSubscription);
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import ca.uhn.fhir.interceptor.api.HookParams;
|
|||
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
|
||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
|
||||
import ca.uhn.fhir.jpa.subscription.channel.api.PayloadTooLargeException;
|
||||
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
|
||||
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
|
||||
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
|
||||
|
@ -156,8 +157,21 @@ public class SubscriptionMatchDeliverer {
|
|||
ourLog.warn("Failed to send message to Delivery Channel.");
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
if (e.getCause() instanceof PayloadTooLargeException) {
|
||||
ourLog.warn("Failed to send message to Delivery Channel because the payload size is larger than broker "
|
||||
+ "max message size. Retry is about to be performed without payload.");
|
||||
ResourceDeliveryJsonMessage msgPayloadLess = nullOutPayload(theWrappedMsg);
|
||||
trySendToDeliveryChannel(msgPayloadLess, theDeliveryChannel);
|
||||
} else {
|
||||
ourLog.error("Failed to send message to Delivery Channel", e);
|
||||
throw new RuntimeException(Msg.code(7) + "Failed to send message to Delivery Channel", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ResourceDeliveryJsonMessage nullOutPayload(ResourceDeliveryJsonMessage theWrappedMsg) {
|
||||
ResourceDeliveryMessage resourceDeliveryMessage = theWrappedMsg.getPayload();
|
||||
resourceDeliveryMessage.setPayloadToNull();
|
||||
return new ResourceDeliveryJsonMessage(resourceDeliveryMessage);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
|||
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
|
||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
|
||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -40,6 +41,7 @@ import org.springframework.messaging.MessageHandler;
|
|||
import org.springframework.messaging.MessagingException;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
import static ca.uhn.fhir.rest.server.messaging.BaseResourceMessage.OperationTypeEnum.DELETE;
|
||||
|
@ -64,6 +66,9 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
|
|||
@Autowired
|
||||
private SubscriptionMatchDeliverer mySubscriptionMatchDeliverer;
|
||||
|
||||
@Autowired
|
||||
private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
|
@ -97,6 +102,16 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
|
|||
return;
|
||||
}
|
||||
|
||||
if (theMsg.getPayload(myFhirContext) == null) {
|
||||
// inflate the message and ignore any resource that cannot be found.
|
||||
Optional<ResourceModifiedMessage> inflatedMsg =
|
||||
myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(theMsg);
|
||||
if (inflatedMsg.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
theMsg = inflatedMsg.get();
|
||||
}
|
||||
|
||||
// Interceptor call: SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED
|
||||
HookParams params = new HookParams().add(ResourceModifiedMessage.class, theMsg);
|
||||
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED, params)) {
|
||||
|
|
|
@ -20,9 +20,11 @@
|
|||
package ca.uhn.fhir.jpa.subscription.submit.interceptor;
|
||||
|
||||
import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedProcessingSchedulerSvc;
|
||||
import ca.uhn.fhir.jpa.subscription.channel.api.PayloadTooLargeException;
|
||||
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
|
||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.MessageDeliveryException;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
|
@ -49,11 +51,33 @@ public class SynchronousSubscriptionMatcherInterceptor extends SubscriptionMatch
|
|||
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage);
|
||||
doSubmitResourceModified(theResourceModifiedMessage);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
doSubmitResourceModified(theResourceModifiedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit the message through the broker channel to the matcher.
|
||||
*
|
||||
* Note: most of our integrated tests for subscription assume we can successfully inflate the message and therefore
|
||||
* does not run with an actual database to persist the data. In these cases, submitting the complete message (i.e.
|
||||
* with payload) is OK. However, there are a few tests that do not assume it and do run with an actual DB. For them,
|
||||
* we should null out the payload body before submitting. This try-catch block only covers the case where the
|
||||
* payload is too large, which is enough for now. However, for better practice we might want to consider splitting
|
||||
* this interceptor into two, each for tests with/without DB connection.
|
||||
* @param theResourceModifiedMessage
|
||||
*/
|
||||
private void doSubmitResourceModified(ResourceModifiedMessage theResourceModifiedMessage) {
|
||||
try {
|
||||
myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage);
|
||||
} catch (MessageDeliveryException e) {
|
||||
if (e.getCause() instanceof PayloadTooLargeException) {
|
||||
theResourceModifiedMessage.setPayloadToNull();
|
||||
myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,6 @@ import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
|||
import ca.uhn.fhir.subscription.api.IResourceModifiedConsumerWithRetries;
|
||||
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.hl7.fhir.r5.model.IdType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.event.ContextRefreshedEvent;
|
||||
|
@ -45,8 +44,6 @@ import org.springframework.messaging.MessageDeliveryException;
|
|||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.support.TransactionCallback;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME;
|
||||
|
||||
/**
|
||||
|
@ -151,12 +148,11 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer,
|
|||
boolean wasDeleted = deletePersistedResourceModifiedMessage(
|
||||
thePersistedResourceModifiedMessage.getPersistedResourceModifiedMessagePk());
|
||||
|
||||
Optional<ResourceModifiedMessage> optionalResourceModifiedMessage =
|
||||
inflatePersistedResourceMessage(thePersistedResourceModifiedMessage);
|
||||
// submit the resource modified message with empty payload, actual inflation is done by the matcher.
|
||||
resourceModifiedMessage =
|
||||
createResourceModifiedMessageWithoutInflation(thePersistedResourceModifiedMessage);
|
||||
|
||||
if (wasDeleted && optionalResourceModifiedMessage.isPresent()) {
|
||||
// the PK did exist and we were able to deleted it, ie, we are the only one processing the message
|
||||
resourceModifiedMessage = optionalResourceModifiedMessage.get();
|
||||
if (wasDeleted) {
|
||||
submitResourceModified(resourceModifiedMessage);
|
||||
}
|
||||
} catch (MessageDeliveryException exception) {
|
||||
|
@ -186,32 +182,10 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer,
|
|||
};
|
||||
}
|
||||
|
||||
private Optional<ResourceModifiedMessage> inflatePersistedResourceMessage(
|
||||
private ResourceModifiedMessage createResourceModifiedMessageWithoutInflation(
|
||||
IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
|
||||
ResourceModifiedMessage resourceModifiedMessage = null;
|
||||
|
||||
try {
|
||||
resourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(
|
||||
return myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation(
|
||||
thePersistedResourceModifiedMessage);
|
||||
|
||||
} catch (ResourceNotFoundException e) {
|
||||
IPersistedResourceModifiedMessagePK persistedResourceModifiedMessagePk =
|
||||
thePersistedResourceModifiedMessage.getPersistedResourceModifiedMessagePk();
|
||||
|
||||
IdType idType = new IdType(
|
||||
thePersistedResourceModifiedMessage.getResourceType(),
|
||||
persistedResourceModifiedMessagePk.getResourcePid(),
|
||||
persistedResourceModifiedMessagePk.getResourceVersion());
|
||||
|
||||
ourLog.warn(
|
||||
"Scheduled submission will be ignored since resource {} cannot be found",
|
||||
idType.asStringValue(),
|
||||
e);
|
||||
} catch (Exception ex) {
|
||||
ourLog.error("Unknown error encountered on inflation of resources.", ex);
|
||||
}
|
||||
|
||||
return Optional.ofNullable(resourceModifiedMessage);
|
||||
}
|
||||
|
||||
private boolean deletePersistedResourceModifiedMessage(IPersistedResourceModifiedMessagePK theResourceModifiedPK) {
|
||||
|
|
|
@ -8,10 +8,13 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
|
|||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
|
||||
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
|
||||
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
|
||||
import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender;
|
||||
import ca.uhn.fhir.jpa.subscription.match.deliver.email.SubscriptionDeliveringEmailSubscriber;
|
||||
import ca.uhn.fhir.jpa.subscription.match.deliver.message.SubscriptionDeliveringMessageSubscriber;
|
||||
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
|
||||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
||||
|
@ -26,6 +29,7 @@ import ca.uhn.fhir.rest.client.api.IGenericClient;
|
|||
import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;
|
||||
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import org.hl7.fhir.r4.model.Bundle;
|
||||
import org.hl7.fhir.r4.model.IdType;
|
||||
|
@ -33,6 +37,8 @@ import org.hl7.fhir.r4.model.Patient;
|
|||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.mockito.Answers;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
|
@ -57,6 +63,7 @@ import static org.hamcrest.Matchers.hasSize;
|
|||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
|
@ -71,6 +78,7 @@ public class BaseSubscriptionDeliverySubscriberTest {
|
|||
|
||||
private SubscriptionDeliveringRestHookSubscriber mySubscriber;
|
||||
private SubscriptionDeliveringMessageSubscriber myMessageSubscriber;
|
||||
private SubscriptionDeliveringEmailSubscriber myEmailSubscriber;
|
||||
private final FhirContext myCtx = FhirContext.forR4();
|
||||
|
||||
@Mock
|
||||
|
@ -96,6 +104,12 @@ public class BaseSubscriptionDeliverySubscriberTest {
|
|||
@Mock
|
||||
private MatchUrlService myMatchUrlService;
|
||||
|
||||
@Mock
|
||||
private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
|
||||
|
||||
@Mock
|
||||
private IEmailSender myEmailSender;
|
||||
|
||||
@BeforeEach
|
||||
public void before() {
|
||||
mySubscriber = new SubscriptionDeliveringRestHookSubscriber();
|
||||
|
@ -109,8 +123,15 @@ public class BaseSubscriptionDeliverySubscriberTest {
|
|||
myMessageSubscriber.setSubscriptionRegistryForUnitTest(mySubscriptionRegistry);
|
||||
myMessageSubscriber.setDaoRegistryForUnitTest(myDaoRegistry);
|
||||
myMessageSubscriber.setMatchUrlServiceForUnitTest(myMatchUrlService);
|
||||
myMessageSubscriber.setResourceModifiedMessagePersistenceSvcForUnitTest(myResourceModifiedMessagePersistenceSvc);
|
||||
myCtx.setRestfulClientFactory(myRestfulClientFactory);
|
||||
when(myRestfulClientFactory.newGenericClient(any())).thenReturn(myGenericClient);
|
||||
|
||||
myEmailSubscriber = new SubscriptionDeliveringEmailSubscriber(myEmailSender);
|
||||
myEmailSubscriber.setFhirContextForUnitTest(myCtx);
|
||||
myEmailSubscriber.setInterceptorBroadcasterForUnitTest(myInterceptorBroadcaster);
|
||||
myEmailSubscriber.setSubscriptionRegistryForUnitTest(mySubscriptionRegistry);
|
||||
myEmailSubscriber.setResourceModifiedMessagePersistenceSvcForUnitTest(myResourceModifiedMessagePersistenceSvc);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -400,6 +421,38 @@ public class BaseSubscriptionDeliverySubscriberTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {"message", "email"})
|
||||
public void testMessageAndEmailSubscriber_whenPayloadIsNull_shouldTryInflateMessage(String theSubscriber) {
|
||||
// setup
|
||||
when(myInterceptorBroadcaster.callHooks(any(), any())).thenReturn(true);
|
||||
|
||||
Patient patient = generatePatient();
|
||||
|
||||
CanonicalSubscription subscription = generateSubscription();
|
||||
|
||||
ResourceDeliveryMessage payload = new ResourceDeliveryMessage();
|
||||
payload.setSubscription(subscription);
|
||||
payload.setPayload(myCtx, patient, EncodingEnum.JSON);
|
||||
payload.setOperationType(ResourceModifiedMessage.OperationTypeEnum.CREATE);
|
||||
|
||||
// mock the inflated message
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(any())).thenReturn(any());
|
||||
|
||||
// this will null out the payload but keep the resource id and version.
|
||||
payload.setPayloadToNull();
|
||||
|
||||
// execute & verify
|
||||
switch (theSubscriber) {
|
||||
case "message" ->
|
||||
assertThrows(MessagingException.class, () -> myMessageSubscriber.handleMessage(new ResourceDeliveryJsonMessage(payload)));
|
||||
case "email" ->
|
||||
assertThrows(MessagingException.class, () -> myEmailSubscriber.handleMessage(new ResourceDeliveryJsonMessage(payload)));
|
||||
}
|
||||
|
||||
verify(myResourceModifiedMessagePersistenceSvc, times(1)).inflatePersistedResourceModifiedMessageOrNull(any());
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private Patient generatePatient() {
|
||||
Patient patient = new Patient();
|
||||
|
|
|
@ -15,6 +15,7 @@ import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
|
|||
import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender;
|
||||
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
|
||||
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionQueryValidator;
|
||||
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
@ -90,6 +91,11 @@ public class DaoSubscriptionMatcherTest {
|
|||
public IEmailSender emailSender(){
|
||||
return mock(IEmailSender.class);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IResourceModifiedMessagePersistenceSvc resourceModifiedMessagePersistenceSvc() {
|
||||
return mock(IResourceModifiedMessagePersistenceSvc.class);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2,8 +2,10 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
|
|||
|
||||
import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
|
||||
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
|
||||
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
|
||||
import org.hl7.fhir.dstu3.model.Subscription;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
|
@ -18,6 +20,9 @@ public class SubscriptionRegistrySharedTest extends BaseSubscriptionRegistryTest
|
|||
|
||||
private static final String OTHER_ID = "OTHER_ID";
|
||||
|
||||
@Autowired
|
||||
private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
|
||||
|
||||
@Configuration
|
||||
public static class SpringConfig {
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
|||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
|
||||
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider;
|
||||
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.hl7.fhir.dstu3.model.Subscription;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -62,4 +63,9 @@ public class TestSubscriptionDstu3Config {
|
|||
return mock;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IResourceModifiedMessagePersistenceSvc resourceModifiedMessagePersistenceSvc() {
|
||||
return mock(IResourceModifiedMessagePersistenceSvc.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import ca.uhn.fhir.rest.api.Constants;
|
|||
import ca.uhn.fhir.rest.api.MethodOutcome;
|
||||
import ca.uhn.fhir.rest.server.IResourceProvider;
|
||||
import ca.uhn.fhir.rest.server.RestfulServer;
|
||||
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
|
||||
import ca.uhn.fhir.test.utilities.JettyUtil;
|
||||
import ca.uhn.test.concurrency.IPointcutLatch;
|
||||
import ca.uhn.test.concurrency.PointcutLatch;
|
||||
|
@ -54,6 +55,10 @@ import javax.servlet.http.HttpServletRequest;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends BaseSubscriptionDstu3Test {
|
||||
public static final ChannelConsumerSettings CONSUMER_OPTIONS = new ChannelConsumerSettings().setConcurrentConsumers(1);
|
||||
|
@ -100,6 +105,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
|
|||
IInterceptorService myInterceptorRegistry;
|
||||
@Autowired
|
||||
private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
|
||||
@Autowired
|
||||
private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
|
||||
|
||||
@BeforeEach
|
||||
public void beforeReset() {
|
||||
|
@ -140,6 +147,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
|
|||
public <T extends IBaseResource> T sendResource(T theResource, RequestPartitionId theRequestPartitionId) throws InterruptedException {
|
||||
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE, null, theRequestPartitionId);
|
||||
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(msg);
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(any())).thenReturn(Optional.of(msg));
|
||||
|
||||
mySubscriptionMatchingPost.setExpectedCount(1);
|
||||
ourSubscribableChannel.send(message);
|
||||
mySubscriptionMatchingPost.awaitExpected();
|
||||
|
|
|
@ -17,6 +17,7 @@ import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscriba
|
|||
import ca.uhn.fhir.model.primitive.IdDt;
|
||||
import ca.uhn.fhir.rest.api.Constants;
|
||||
import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
|
||||
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
|
||||
import ca.uhn.fhir.util.HapiExtensions;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.hl7.fhir.dstu3.model.BooleanType;
|
||||
|
@ -33,6 +34,7 @@ import org.mockito.Mockito;
|
|||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser.TypeEnum.STARTYPE_EXPRESSION;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
@ -434,6 +436,8 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
|
|||
SubscriptionCriteriaParser.SubscriptionCriteria mySubscriptionCriteria;
|
||||
@Mock
|
||||
SubscriptionMatchDeliverer mySubscriptionMatchDeliverer;
|
||||
@Mock
|
||||
IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
|
||||
@InjectMocks
|
||||
SubscriptionMatchingSubscriber subscriber;
|
||||
|
||||
|
@ -445,6 +449,7 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
|
|||
when(myInterceptorBroadcaster.callHooks(
|
||||
eq(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED), any(HookParams.class))).thenReturn(true);
|
||||
when(mySubscriptionRegistry.getAll()).thenReturn(Collections.emptyList());
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(any())).thenReturn(Optional.ofNullable(message));
|
||||
|
||||
subscriber.matchActiveSubscriptionsAndDeliver(message);
|
||||
|
||||
|
@ -465,6 +470,7 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
|
|||
when(myActiveSubscription.getCriteria()).thenReturn(mySubscriptionCriteria);
|
||||
when(myActiveSubscription.getId()).thenReturn("Patient/123");
|
||||
when(mySubscriptionCriteria.getType()).thenReturn(STARTYPE_EXPRESSION);
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(any())).thenReturn(Optional.ofNullable(message));
|
||||
|
||||
subscriber.matchActiveSubscriptionsAndDeliver(message);
|
||||
|
||||
|
@ -486,6 +492,7 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
|
|||
when(myNonDeleteSubscription.getCriteria()).thenReturn(mySubscriptionCriteria);
|
||||
when(myNonDeleteSubscription.getId()).thenReturn("Patient/123");
|
||||
when(mySubscriptionCriteria.getType()).thenReturn(STARTYPE_EXPRESSION);
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(any())).thenReturn(Optional.ofNullable(message));
|
||||
|
||||
subscriber.matchActiveSubscriptionsAndDeliver(message);
|
||||
|
||||
|
@ -505,6 +512,7 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
|
|||
when(myActiveSubscription.getId()).thenReturn("Patient/123");
|
||||
when(mySubscriptionCriteria.getType()).thenReturn(STARTYPE_EXPRESSION);
|
||||
when(myCanonicalSubscription.getSendDeleteMessages()).thenReturn(true);
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(any())).thenReturn(Optional.ofNullable(message));
|
||||
|
||||
subscriber.matchActiveSubscriptionsAndDeliver(message);
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
|||
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
|
||||
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
|
||||
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
|
||||
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
|
||||
import org.hl7.fhir.r4.model.IdType;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -146,5 +147,10 @@ public class WebsocketConnectionValidatorTest {
|
|||
return mock(IEmailSender.class);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IResourceModifiedMessagePersistenceSvc resourceModifiedMessagePersistenceSvc(){
|
||||
return mock(IResourceModifiedMessagePersistenceSvc.class);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -253,22 +253,24 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPersistedResourceModifiedMessage_whenFetchFromDb_willEqualOriginalMessage() throws JsonProcessingException {
|
||||
public void testMethodInflatePersistedResourceModifiedMessage_whenGivenResourceModifiedMessageWithEmptyPayload_willEqualOriginalMessage() {
|
||||
mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
|
||||
// given
|
||||
// setup
|
||||
TransactionTemplate transactionTemplate = new TransactionTemplate(myTxManager);
|
||||
Observation obs = sendObservation("zoop", "SNOMED-CT", "theExplicitSource", "theRequestId");
|
||||
|
||||
ResourceModifiedMessage originalResourceModifiedMessage = createResourceModifiedMessage(obs);
|
||||
ResourceModifiedMessage resourceModifiedMessageWithEmptyPayload = createResourceModifiedMessage(obs);
|
||||
resourceModifiedMessageWithEmptyPayload.setPayloadToNull();
|
||||
|
||||
transactionTemplate.execute(tx -> {
|
||||
|
||||
IPersistedResourceModifiedMessage persistedResourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.persist(originalResourceModifiedMessage);
|
||||
myResourceModifiedMessagePersistenceSvc.persist(originalResourceModifiedMessage);
|
||||
|
||||
// when
|
||||
ResourceModifiedMessage restoredResourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(persistedResourceModifiedMessage);
|
||||
// execute
|
||||
ResourceModifiedMessage restoredResourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(resourceModifiedMessageWithEmptyPayload);
|
||||
|
||||
// then
|
||||
// verify
|
||||
assertEquals(toJson(originalResourceModifiedMessage), toJson(restoredResourceModifiedMessage));
|
||||
assertEquals(originalResourceModifiedMessage, restoredResourceModifiedMessage);
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ public class ResourceModifiedSubmitterSvcTest {
|
|||
// given
|
||||
// a successful deletion implies that the message did exist.
|
||||
when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(true);
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())).thenReturn(new ResourceModifiedMessage());
|
||||
when(myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation(any())).thenReturn(new ResourceModifiedMessage());
|
||||
|
||||
// when
|
||||
boolean wasProcessed = myResourceModifiedSubmitterSvc.submitPersisedResourceModifiedMessage(new ResourceModifiedEntity());
|
||||
|
@ -134,7 +134,7 @@ public class ResourceModifiedSubmitterSvcTest {
|
|||
// when
|
||||
when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any()))
|
||||
.thenThrow(new RuntimeException(deleteExMsg));
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any()))
|
||||
when(myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation(any()))
|
||||
.thenThrow(new RuntimeException(inflationExMsg));
|
||||
|
||||
// test
|
||||
|
@ -180,7 +180,7 @@ public class ResourceModifiedSubmitterSvcTest {
|
|||
// when
|
||||
when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any()))
|
||||
.thenReturn(true);
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any()))
|
||||
when(myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation(any()))
|
||||
.thenReturn(msg);
|
||||
when(myChannelProducer.send(any()))
|
||||
.thenThrow(new RuntimeException(exceptionString));
|
||||
|
@ -206,7 +206,7 @@ public class ResourceModifiedSubmitterSvcTest {
|
|||
// given
|
||||
// deletion fails, someone else was faster and processed the message
|
||||
when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(false);
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())).thenReturn(new ResourceModifiedMessage());
|
||||
when(myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation(any())).thenReturn(new ResourceModifiedMessage());
|
||||
|
||||
// when
|
||||
boolean wasProcessed = myResourceModifiedSubmitterSvc.submitPersisedResourceModifiedMessage(new ResourceModifiedEntity());
|
||||
|
@ -223,7 +223,7 @@ public class ResourceModifiedSubmitterSvcTest {
|
|||
public void testSubmitPersistedResourceModifiedMessage_whitErrorOnSending_willRollbackDeletion(){
|
||||
// given
|
||||
when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(true);
|
||||
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())).thenReturn(new ResourceModifiedMessage());
|
||||
when(myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation(any())).thenReturn(new ResourceModifiedMessage());
|
||||
|
||||
// simulate failure writing to the channel
|
||||
when(myChannelProducer.send(any())).thenThrow(new MessageDeliveryException("sendingError"));
|
||||
|
|
|
@ -52,15 +52,15 @@ public abstract class BaseResourceModifiedMessage extends BaseResourceMessage im
|
|||
@JsonProperty(value = "partitionId")
|
||||
protected RequestPartitionId myPartitionId;
|
||||
|
||||
@JsonProperty(value = "payloadVersion")
|
||||
protected String myPayloadVersion;
|
||||
|
||||
@JsonIgnore
|
||||
protected transient IBaseResource myPayloadDecoded;
|
||||
|
||||
@JsonIgnore
|
||||
protected transient String myPayloadType;
|
||||
|
||||
@JsonIgnore
|
||||
protected String myPayloadVersion;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
|
@ -68,6 +68,12 @@ public abstract class BaseResourceModifiedMessage extends BaseResourceMessage im
|
|||
super();
|
||||
}
|
||||
|
||||
public BaseResourceModifiedMessage(IIdType theIdType, OperationTypeEnum theOperationType) {
|
||||
this();
|
||||
setOperationType(theOperationType);
|
||||
setPayloadId(theIdType);
|
||||
}
|
||||
|
||||
public BaseResourceModifiedMessage(
|
||||
FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
|
||||
this();
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
package ca.uhn.fhir.jpa.subscription.channel.api;
|
||||
|
||||
/**
|
||||
* This exception represents the message payload exceeded the maximum message size of the broker. Used as a wrapper of
|
||||
* similar exceptions specific to different message brokers, e.g. kafka.common.errors.RecordTooLargeException.
|
||||
*/
|
||||
public class PayloadTooLargeException extends RuntimeException {
|
||||
|
||||
public PayloadTooLargeException(String theMessage) {
|
||||
super(theMessage);
|
||||
}
|
||||
|
||||
public PayloadTooLargeException(String theMessage, Throwable theThrowable) {
|
||||
super(theMessage, theThrowable);
|
||||
}
|
||||
}
|
|
@ -108,6 +108,10 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
|
|||
myPayloadId = thePayload.getIdElement().toUnqualifiedVersionless().getValue();
|
||||
}
|
||||
|
||||
public void setPayloadToNull() {
|
||||
myPayloadString = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPayloadId() {
|
||||
return myPayloadId;
|
||||
|
|
|
@ -26,6 +26,7 @@ import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
|
||||
/**
|
||||
* Most of this class has been moved to ResourceModifiedMessage in the hapi-fhir-server project, for a reusable channel ResourceModifiedMessage
|
||||
|
@ -47,6 +48,11 @@ public class ResourceModifiedMessage extends BaseResourceModifiedMessage {
|
|||
super();
|
||||
}
|
||||
|
||||
public ResourceModifiedMessage(IIdType theIdType, OperationTypeEnum theOperationType) {
|
||||
super(theIdType, theOperationType);
|
||||
setPartitionId(RequestPartitionId.defaultPartition());
|
||||
}
|
||||
|
||||
public ResourceModifiedMessage(
|
||||
FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
|
||||
super(theFhirContext, theResource, theOperationType);
|
||||
|
@ -79,6 +85,10 @@ public class ResourceModifiedMessage extends BaseResourceModifiedMessage {
|
|||
mySubscriptionId = theSubscriptionId;
|
||||
}
|
||||
|
||||
public void setPayloadToNull() {
|
||||
myPayload = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new ToStringBuilder(this)
|
||||
|
|
|
@ -25,6 +25,7 @@ import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK;
|
|||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* An implementer of this interface will provide {@link ResourceModifiedMessage} persistence services.
|
||||
|
@ -61,10 +62,29 @@ public interface IResourceModifiedMessagePersistenceSvc {
|
|||
/**
|
||||
* Restore a resourceModifiedMessage to its pre persistence representation.
|
||||
*
|
||||
* @param thePersistedResourceModifiedMessage The message needing restoration.
|
||||
* @param theResourceModifiedMessage The message needing restoration.
|
||||
* @return The resourceModifiedMessage in its pre persistence form.
|
||||
*/
|
||||
ResourceModifiedMessage inflatePersistedResourceModifiedMessage(
|
||||
ResourceModifiedMessage inflatePersistedResourceModifiedMessage(ResourceModifiedMessage theResourceModifiedMessage);
|
||||
|
||||
/**
|
||||
* Restore a resourceModifiedMessage to its pre persistence representation or null if the resource does not exist.
|
||||
*
|
||||
* @param theResourceModifiedMessage
|
||||
* @return An Optional containing The resourceModifiedMessage in its pre persistence form or null when the resource
|
||||
* does not exist
|
||||
*/
|
||||
Optional<ResourceModifiedMessage> inflatePersistedResourceModifiedMessageOrNull(
|
||||
ResourceModifiedMessage theResourceModifiedMessage);
|
||||
|
||||
/**
|
||||
* Create a ResourceModifiedMessage without its pre persistence representation, i.e. without the resource body in
|
||||
* payload
|
||||
*
|
||||
* @param thePersistedResourceModifiedMessage The message needing creation
|
||||
* @return The resourceModifiedMessage without its pre persistence form
|
||||
*/
|
||||
ResourceModifiedMessage createResourceModifiedMessageFromEntityWithoutInflation(
|
||||
IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage);
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue