Merge branch 'master' of github.com:jamesagnew/hapi-fhir

This commit is contained in:
James Agnew 2018-08-11 09:56:09 -04:00
commit b9b85f5ba9
8 changed files with 42 additions and 78 deletions

View File

@ -32,7 +32,6 @@ import org.springframework.messaging.MessagingException;
public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptionSubscriber { public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptionSubscriber {
private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriber.class); private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriber.class);
private boolean myReloadResourceBeforeDelivery = true;
public BaseSubscriptionDeliverySubscriber(IFhirResourceDao<?> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) { public BaseSubscriptionDeliverySubscriber(IFhirResourceDao<?> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor); super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor);
@ -51,35 +50,30 @@ public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptio
ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
subscriptionId = msg.getSubscription().getIdElement(getContext()).getValue(); subscriptionId = msg.getSubscription().getIdElement(getContext()).getValue();
if (!subscriptionTypeApplies(getContext(), msg.getSubscription().getBackingSubscription(getContext()))) {
return;
}
CanonicalSubscription updatedSubscription = (CanonicalSubscription) getSubscriptionInterceptor().getIdToSubscription().get(msg.getSubscription().getIdElement(getContext()).getIdPart()); CanonicalSubscription updatedSubscription = (CanonicalSubscription) getSubscriptionInterceptor().getIdToSubscription().get(msg.getSubscription().getIdElement(getContext()).getIdPart());
if (updatedSubscription != null) { if (updatedSubscription != null) {
msg.setSubscription(updatedSubscription); msg.setSubscription(updatedSubscription);
} }
if (myReloadResourceBeforeDelivery) { if (!subscriptionTypeApplies(msg.getSubscription())) {
// Reload the payload just in case any interceptors modified return;
// it before it was saved to the database. This is also
// useful for resources created in a transaction, since they
// can have placeholder IDs in them.
IIdType payloadId = msg.getPayloadId(getContext());
Class type = getContext().getResourceDefinition(payloadId.getResourceType()).getImplementingClass();
IFhirResourceDao dao = getSubscriptionInterceptor().getDao(type);
IBaseResource loadedPayload;
try {
loadedPayload = dao.read(payloadId);
} catch (ResourceNotFoundException e) {
// This can happen if a last minute failure happens when saving a resource,
// eg a constraint causes the transaction to roll back on commit
ourLog.warn("Unable to find resource {} - Aborting delivery", payloadId.getValue());
return;
}
msg.setPayload(getContext(), loadedPayload);
} }
// Load the resource
IIdType payloadId = msg.getPayloadId(getContext());
Class type = getContext().getResourceDefinition(payloadId.getResourceType()).getImplementingClass();
IFhirResourceDao dao = getSubscriptionInterceptor().getDao(type);
IBaseResource loadedPayload;
try {
loadedPayload = dao.read(payloadId);
} catch (ResourceNotFoundException e) {
// This can happen if a last minute failure happens when saving a resource,
// eg a constraint causes the transaction to roll back on commit
ourLog.warn("Unable to find resource {} - Aborting delivery", payloadId.getValue());
return;
}
msg.setPayload(getContext(), loadedPayload);
handleMessage(msg); handleMessage(msg);
} catch (Exception e) { } catch (Exception e) {
String msg = "Failure handling subscription payload for subscription: " + subscriptionId; String msg = "Failure handling subscription payload for subscription: " + subscriptionId;
@ -90,8 +84,4 @@ public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptio
public abstract void handleMessage(ResourceDeliveryMessage theMessage) throws Exception; public abstract void handleMessage(ResourceDeliveryMessage theMessage) throws Exception;
public void setReloadResourceBeforeDelivery(boolean theReloadResourceBeforeDelivery) {
myReloadResourceBeforeDelivery = theReloadResourceBeforeDelivery;
}
} }

View File

@ -70,10 +70,6 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
static final String SUBSCRIPTION_STATUS = "Subscription.status"; static final String SUBSCRIPTION_STATUS = "Subscription.status";
static final String SUBSCRIPTION_TYPE = "Subscription.channel.type"; static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
static final String SUBSCRIPTION_CRITERIA = "Subscription.criteria";
static final String SUBSCRIPTION_ENDPOINT = "Subscription.channel.endpoint";
static final String SUBSCRIPTION_PAYLOAD = "Subscription.channel.payload";
static final String SUBSCRIPTION_HEADER = "Subscription.channel.header";
private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000; private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000;
private SubscribableChannel myProcessingChannel; private SubscribableChannel myProcessingChannel;
private SubscribableChannel myDeliveryChannel; private SubscribableChannel myDeliveryChannel;
@ -128,7 +124,6 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
CanonicalSubscription retVal = new CanonicalSubscription(); CanonicalSubscription retVal = new CanonicalSubscription();
try { try {
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus())); retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus()));
retVal.setBackingSubscription(myCtx, theSubscription);
retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType())); retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType()));
retVal.setCriteriaString(subscription.getCriteria()); retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint()); retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
@ -147,7 +142,6 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
CanonicalSubscription retVal = new CanonicalSubscription(); CanonicalSubscription retVal = new CanonicalSubscription();
try { try {
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus().toCode())); retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus().toCode()));
retVal.setBackingSubscription(myCtx, theSubscription);
retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType().toCode())); retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType().toCode()));
retVal.setCriteriaString(subscription.getCriteria()); retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint()); retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
@ -193,7 +187,6 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
CanonicalSubscription retVal = new CanonicalSubscription(); CanonicalSubscription retVal = new CanonicalSubscription();
retVal.setStatus(subscription.getStatus()); retVal.setStatus(subscription.getStatus());
retVal.setBackingSubscription(myCtx, theSubscription);
retVal.setChannelType(subscription.getChannel().getType()); retVal.setChannelType(subscription.getChannel().getType());
retVal.setCriteriaString(subscription.getCriteria()); retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint()); retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
@ -204,7 +197,6 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
if (retVal.getChannelType() == Subscription.SubscriptionChannelType.EMAIL) { if (retVal.getChannelType() == Subscription.SubscriptionChannelType.EMAIL) {
String from; String from;
String subjectTemplate; String subjectTemplate;
String bodyTemplate;
try { try {
from = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_EMAIL_FROM); from = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_EMAIL_FROM);
subjectTemplate = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE); subjectTemplate = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
@ -260,8 +252,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
myResourceTypeToDao = theResourceTypeToDao; myResourceTypeToDao = theResourceTypeToDao;
} }
IFhirResourceDao<R> dao = (IFhirResourceDao<R>) myResourceTypeToDao.get(theType); return (IFhirResourceDao<R>) myResourceTypeToDao.get(theType);
return dao;
} }
public SubscribableChannel getDeliveryChannel() { public SubscribableChannel getDeliveryChannel() {

View File

@ -22,8 +22,6 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Subscription; import org.hl7.fhir.r4.model.Subscription;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
@ -62,19 +60,19 @@ public abstract class BaseSubscriptionSubscriber implements MessageHandler {
/** /**
* Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor? * Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor?
*/ */
protected boolean subscriptionTypeApplies(FhirContext theCtx, IBaseResource theSubscription) { protected boolean subscriptionTypeApplies(CanonicalSubscription theSubscription) {
Subscription.SubscriptionChannelType channelType = getChannelType(); Subscription.SubscriptionChannelType channelType = getChannelType();
return subscriptionTypeApplies(theCtx, theSubscription, channelType); String subscriptionType = theSubscription.getChannelType().toCode();
return subscriptionTypeApplies(subscriptionType, channelType);
} }
/** /**
* Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor? * Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor?
*/ */
static boolean subscriptionTypeApplies(FhirContext theCtx, IBaseResource theSubscription, Subscription.SubscriptionChannelType theChannelType) { static boolean subscriptionTypeApplies(String theSubscriptionChannelTypeCode, Subscription.SubscriptionChannelType theChannelType) {
IPrimitiveType<?> subscriptionType = theCtx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_TYPE, IPrimitiveType.class);
boolean subscriptionTypeApplies = false; boolean subscriptionTypeApplies = false;
if (subscriptionType != null) { if (theSubscriptionChannelTypeCode != null) {
if (theChannelType.toCode().equals(subscriptionType.getValueAsString())) { if (theChannelType.toCode().equals(theSubscriptionChannelTypeCode)) {
subscriptionTypeApplies = true; subscriptionTypeApplies = true;
} }
} }

View File

@ -22,12 +22,10 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.EventDefinition; import org.hl7.fhir.r4.model.EventDefinition;
@ -59,10 +57,6 @@ public class CanonicalSubscription implements Serializable {
private Subscription.SubscriptionChannelType myChannelType; private Subscription.SubscriptionChannelType myChannelType;
@JsonProperty("status") @JsonProperty("status")
private Subscription.SubscriptionStatus myStatus; private Subscription.SubscriptionStatus myStatus;
@JsonIgnore
private transient IBaseResource myBackingSubscription;
@JsonProperty("backingSubscription")
private String myBackingSubscriptionString;
@JsonProperty("triggerDefinition") @JsonProperty("triggerDefinition")
private CanonicalEventDefinition myTrigger; private CanonicalEventDefinition myTrigger;
@JsonProperty("emailDetails") @JsonProperty("emailDetails")
@ -91,13 +85,6 @@ public class CanonicalSubscription implements Serializable {
.isEquals(); .isEquals();
} }
public IBaseResource getBackingSubscription(FhirContext theCtx) {
if (myBackingSubscription == null && myBackingSubscriptionString != null) {
myBackingSubscription = theCtx.newJsonParser().parseResource(myBackingSubscriptionString);
}
return myBackingSubscription;
}
public Subscription.SubscriptionChannelType getChannelType() { public Subscription.SubscriptionChannelType getChannelType() {
return myChannelType; return myChannelType;
} }
@ -190,14 +177,6 @@ public class CanonicalSubscription implements Serializable {
.toHashCode(); .toHashCode();
} }
public void setBackingSubscription(FhirContext theCtx, IBaseResource theBackingSubscription) {
myBackingSubscription = theBackingSubscription;
myBackingSubscriptionString = null;
if (myBackingSubscription != null) {
myBackingSubscriptionString = theCtx.newJsonParser().encodeResourceToString(myBackingSubscription);
}
}
public void setHeaders(List<? extends IPrimitiveType<String>> theHeader) { public void setHeaders(List<? extends IPrimitiveType<String>> theHeader) {
myHeaders = new ArrayList<>(); myHeaders = new ArrayList<>();
for (IPrimitiveType<String> next : theHeader) { for (IPrimitiveType<String> next : theHeader) {

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import com.fasterxml.jackson.annotation.*; import com.fasterxml.jackson.annotation.*;
import com.google.gson.Gson; import com.google.gson.Gson;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
@ -41,8 +42,6 @@ public class ResourceDeliveryMessage {
private String mySubscriptionString; private String mySubscriptionString;
@JsonIgnore @JsonIgnore
private transient IBaseResource myPayload; private transient IBaseResource myPayload;
@JsonProperty("payload")
private String myPayoadString;
@JsonProperty("payloadId") @JsonProperty("payloadId")
private String myPayloadId; private String myPayloadId;
@JsonProperty("operationType") @JsonProperty("operationType")
@ -57,9 +56,7 @@ public class ResourceDeliveryMessage {
} }
public IBaseResource getPayload(FhirContext theCtx) { public IBaseResource getPayload(FhirContext theCtx) {
if (myPayload == null && myPayoadString != null) { Validate.notNull(myPayload);
myPayload = theCtx.newJsonParser().parseResource(myPayoadString);
}
return myPayload; return myPayload;
} }
@ -87,7 +84,6 @@ public class ResourceDeliveryMessage {
public void setPayload(FhirContext theCtx, IBaseResource thePayload) { public void setPayload(FhirContext theCtx, IBaseResource thePayload) {
myPayload = thePayload; myPayload = thePayload;
myPayoadString = theCtx.newJsonParser().encodeResourceToString(thePayload);
} }
public void setPayloadId(IIdType thePayloadId) { public void setPayloadId(IIdType thePayloadId) {

View File

@ -35,7 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionCallbackWithoutResult;
@ -43,7 +42,6 @@ import org.springframework.transaction.support.TransactionSynchronizationAdapter
import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
import java.util.Date;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -73,7 +71,14 @@ public class SubscriptionActivatingSubscriber {
} }
public void activateAndRegisterSubscriptionIfRequired(final IBaseResource theSubscription) { public void activateAndRegisterSubscriptionIfRequired(final IBaseResource theSubscription) {
boolean subscriptionTypeApplies = BaseSubscriptionSubscriber.subscriptionTypeApplies(myCtx, theSubscription, myChannelType);
// Grab the value for "Subscription.channel.type" so we can see if this
// subscriber applies..
String subscriptionChannelType = myCtx
.newTerser()
.getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_TYPE, IPrimitiveType.class)
.getValueAsString();
boolean subscriptionTypeApplies = BaseSubscriptionSubscriber.subscriptionTypeApplies(subscriptionChannelType, myChannelType);
if (subscriptionTypeApplies == false) { if (subscriptionTypeApplies == false) {
return; return;
} }

View File

@ -1431,7 +1431,7 @@
<plugin> <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId> <artifactId>versions-maven-plugin</artifactId>
<version>2.5</version> <version>2.6-SNAPSHOT</version>
<configuration> <configuration>
<processDependencyManagementTransitive>false</processDependencyManagementTransitive> <processDependencyManagementTransitive>false</processDependencyManagementTransitive>
</configuration> </configuration>

View File

@ -244,6 +244,11 @@
a resource could be associated with the wrong entry in the response. a resource could be associated with the wrong entry in the response.
Thanks to GitHub user @jbalbien for the pull request! Thanks to GitHub user @jbalbien for the pull request!
</action> </action>
<action type="add">
JPA subscription delivery queues no longer store the resource body in the
queue (only the ID), which should reduce the memory/disk footprint of the queue
when it grows long.
</action>
</release> </release>
<release version="3.4.0" date="2018-05-28"> <release version="3.4.0" date="2018-05-28">
<action type="add"> <action type="add">