From 2fa912a43fccee48b51608d3584bb99818abd90f Mon Sep 17 00:00:00 2001 From: James Date: Tue, 19 Sep 2017 22:01:36 -0400 Subject: [PATCH] Use serialized messages for queues --- .../BaseSubscriptionDeliverySubscriber.java | 4 +- .../BaseSubscriptionInterceptor.java | 14 ++-- .../BaseSubscriptionSubscriber.java | 10 ++- .../subscription/CanonicalSubscription.java | 76 ++++++++++++++----- .../subscription/ResourceDeliveryMessage.java | 53 +++++++++---- .../subscription/ResourceModifiedMessage.java | 34 ++++++--- .../SubscriptionActivatingSubscriber.java | 70 ++++++----------- .../SubscriptionCheckingSubscriber.java | 12 +-- ...scriptionDeliveringRestHookSubscriber.java | 6 +- .../SubscriptionWebsocketHandler.java | 7 +- .../ResourceProviderInterceptorDstu2Test.java | 2 +- .../fhir/rest/client/ReferenceClientTest.java | 2 +- 12 files changed, 178 insertions(+), 112 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionDeliverySubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionDeliverySubscriber.java index 2a6afdaa5e8..6a0bde62437 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionDeliverySubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionDeliverySubscriber.java @@ -41,11 +41,11 @@ public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptio } try { ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); - if (!subscriptionTypeApplies(getContext(), msg.getSubscription().getBackingSubscription())) { + if (!subscriptionTypeApplies(getContext(), msg.getSubscription().getBackingSubscription(getContext()))) { return; } - CanonicalSubscription updatedSubscription = (CanonicalSubscription)getSubscriptionInterceptor().getIdToSubscription().get(msg.getSubscription().getIdElement().getIdPart()); + CanonicalSubscription updatedSubscription = (CanonicalSubscription)getSubscriptionInterceptor().getIdToSubscription().get(msg.getSubscription().getIdElement(getContext()).getIdPart()); if (updatedSubscription != null) { msg.setSubscription(updatedSubscription); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java index d07260038e6..b439040075f 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java @@ -115,7 +115,7 @@ public abstract class BaseSubscriptionInterceptor exten CanonicalSubscription retVal = new CanonicalSubscription(); try { retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus())); - retVal.setBackingSubscription(theSubscription); + retVal.setBackingSubscription(myCtx, theSubscription); retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType())); retVal.setCriteriaString(subscription.getCriteria()); retVal.setEndpointUrl(subscription.getChannel().getEndpoint()); @@ -134,7 +134,7 @@ public abstract class BaseSubscriptionInterceptor exten CanonicalSubscription retVal = new CanonicalSubscription(); try { retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus().toCode())); - retVal.setBackingSubscription(theSubscription); + retVal.setBackingSubscription(myCtx, theSubscription); retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType().toCode())); retVal.setCriteriaString(subscription.getCriteria()); retVal.setEndpointUrl(subscription.getChannel().getEndpoint()); @@ -169,7 +169,7 @@ public abstract class BaseSubscriptionInterceptor exten CanonicalSubscription retVal = new CanonicalSubscription(); retVal.setStatus(subscription.getStatus()); - retVal.setBackingSubscription(theSubscription); + retVal.setBackingSubscription(myCtx, theSubscription); retVal.setChannelType(subscription.getChannel().getType()); retVal.setCriteriaString(subscription.getCriteria()); retVal.setEndpointUrl(subscription.getChannel().getEndpoint()); @@ -201,7 +201,7 @@ public abstract class BaseSubscriptionInterceptor exten } org.hl7.fhir.r4.model.EventDefinition def = myEventDefinitionDaoR4.read(ref.getReferenceElement()); - retVal.addTrigger(def.getTrigger()); + retVal.addTrigger(new CanonicalSubscription.CanonicalEventDefinition(def)); } return retVal; @@ -325,7 +325,7 @@ public abstract class BaseSubscriptionInterceptor exten ResourceModifiedMessage msg = new ResourceModifiedMessage(); msg.setId(theResource.getIdElement()); msg.setOperationType(RestOperationTypeEnum.CREATE); - msg.setNewPayload(theResource); + msg.setNewPayload(myCtx, theResource); submitResourceModified(msg); } @@ -342,7 +342,7 @@ public abstract class BaseSubscriptionInterceptor exten ResourceModifiedMessage msg = new ResourceModifiedMessage(); msg.setId(theNewResource.getIdElement()); msg.setOperationType(RestOperationTypeEnum.UPDATE); - msg.setNewPayload(theNewResource); + msg.setNewPayload(myCtx, theNewResource); submitResourceModified(msg); } @@ -443,7 +443,7 @@ public abstract class BaseSubscriptionInterceptor exten protected void submitResourceModified(final ResourceModifiedMessage theMsg) { final GenericMessage message = new GenericMessage<>(theMsg); - mySubscriptionActivatingSubscriber.handleMessage(message); + mySubscriptionActivatingSubscriber.handleMessage(theMsg.getOperationType(), theMsg.getId(myCtx), theMsg.getNewPayload(myCtx)); sendToProcessingChannel(message); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionSubscriber.java index a0e88a2350e..d91d7a5831f 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionSubscriber.java @@ -72,9 +72,17 @@ public abstract class BaseSubscriptionSubscriber implements MessageHandler { * Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor? */ protected boolean subscriptionTypeApplies(FhirContext theCtx, IBaseResource theSubscription) { + Subscription.SubscriptionChannelType channelType = getChannelType(); + return subscriptionTypeApplies(theCtx, theSubscription, channelType); + } + + /** + * Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor? + */ + static boolean subscriptionTypeApplies(FhirContext theCtx, IBaseResource theSubscription, Subscription.SubscriptionChannelType theChannelType) { IPrimitiveType status = theCtx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_TYPE, IPrimitiveType.class); boolean subscriptionTypeApplies = false; - if (getChannelType().toCode().equals(status.getValueAsString())) { + if (theChannelType.toCode().equals(status.getValueAsString())) { subscriptionTypeApplies = true; } return subscriptionTypeApplies; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/CanonicalSubscription.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/CanonicalSubscription.java index c405e62c4fb..1f542f7041d 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/CanonicalSubscription.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/CanonicalSubscription.java @@ -20,13 +20,16 @@ package ca.uhn.fhir.jpa.subscription; * #L% */ +import ca.uhn.fhir.context.FhirContext; +import com.google.gson.annotations.Expose; +import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IPrimitiveType; +import org.hl7.fhir.r4.model.EventDefinition; import org.hl7.fhir.r4.model.Subscription; -import org.hl7.fhir.r4.model.TriggerDefinition; import java.io.Serializable; import java.util.ArrayList; @@ -36,24 +39,35 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; public class CanonicalSubscription implements Serializable { - private static final long serialVersionUID = 364269017L; + private static final long serialVersionUID = 1L; - private IIdType myIdElement; + @SerializedName("id") + private String myIdElement; + @SerializedName("criteria") private String myCriteriaString; + @SerializedName("endpointUrl") private String myEndpointUrl; + @SerializedName("payload") private String myPayloadString; + @SerializedName("headers") private List myHeaders; + @SerializedName("channelType") private Subscription.SubscriptionChannelType myChannelType; + @SerializedName("status") private Subscription.SubscriptionStatus myStatus; - private IBaseResource myBackingSubscription; - private TriggerDefinition myTrigger; + private transient IBaseResource myBackingSubscription; + @SerializedName("backingSubscription") + private String myBackingSubscriptionString; + @SerializedName("triggerDefinition") + private CanonicalEventDefinition myTrigger; + @SerializedName("emailDetails") private EmailDetails myEmailDetails; /** * For now we're using the R4 TriggerDefinition, but this * may change in the future when things stabilize */ - public void addTrigger(TriggerDefinition theTrigger) { + public void addTrigger(CanonicalEventDefinition theTrigger) { myTrigger = theTrigger; } @@ -66,16 +80,27 @@ public class CanonicalSubscription implements Serializable { CanonicalSubscription that = (CanonicalSubscription) theO; return new EqualsBuilder() - .append(getIdElement().getIdPart(), that.getIdElement().getIdPart()) + .append(getIdElementString(), that.getIdElementString()) .isEquals(); } - public IBaseResource getBackingSubscription() { + public IBaseResource getBackingSubscription(FhirContext theCtx) { + if (myBackingSubscription == null && myBackingSubscriptionString != null) { + myBackingSubscription = theCtx.newJsonParser().parseResource(myBackingSubscriptionString); + } return myBackingSubscription; } - public void setBackingSubscription(IBaseResource theBackingSubscription) { + String getIdElementString() { + return myIdElement; + } + + public void setBackingSubscription(FhirContext theCtx, IBaseResource theBackingSubscription) { myBackingSubscription = theBackingSubscription; + myBackingSubscriptionString = null; + if (myBackingSubscription != null) { + myBackingSubscriptionString = theCtx.newJsonParser().encodeResourceToString(myBackingSubscription); + } } public Subscription.SubscriptionChannelType getChannelType() { @@ -122,12 +147,12 @@ public class CanonicalSubscription implements Serializable { } } - public IIdType getIdElement() { - return myIdElement; - } - - public void setIdElement(IIdType theIdElement) { - myIdElement = theIdElement; + public IIdType getIdElement(FhirContext theContext) { + IIdType retVal = null; + if (isNotBlank(myIdElement)) { + retVal = theContext.getVersion().newIdType().setValue(myIdElement); + } + return retVal; } public String getPayloadString() { @@ -150,14 +175,14 @@ public class CanonicalSubscription implements Serializable { * For now we're using the R4 triggerdefinition, but this * may change in the future when things stabilize */ - public TriggerDefinition getTrigger() { + public CanonicalEventDefinition getTrigger() { return myTrigger; } @Override public int hashCode() { return new HashCodeBuilder(17, 37) - .append(getIdElement().getIdPart()) + .append(getIdElementString()) .toHashCode(); } @@ -168,9 +193,19 @@ public class CanonicalSubscription implements Serializable { } } + public void setIdElement(IIdType theIdElement) { + myIdElement = null; + if (theIdElement != null) { + myIdElement = theIdElement.toUnqualifiedVersionless().getValue(); + } + } + public static class EmailDetails { + @SerializedName("from") private String myFrom; + @SerializedName("subjectTemplate") private String mySubjectTemplate; + @SerializedName("bodyTemplate") private String myBodyTemplate; public String getBodyTemplate() { @@ -198,4 +233,11 @@ public class CanonicalSubscription implements Serializable { } } + public static class CanonicalEventDefinition { + + public CanonicalEventDefinition(EventDefinition theDef) { + // nothing yet + } + } + } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java index 206c9d3e0b5..442af42d730 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java @@ -20,7 +20,9 @@ package ca.uhn.fhir.jpa.subscription; * #L% */ +import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.rest.api.RestOperationTypeEnum; +import com.google.gson.Gson; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; @@ -28,11 +30,13 @@ import java.io.Serializable; public class ResourceDeliveryMessage implements Serializable { - private static final long serialVersionUID = 0L; + private static final long serialVersionUID = 1L; - private CanonicalSubscription mySubscription; - private IBaseResource myPayoad; - private IIdType myPayloadId; + private transient CanonicalSubscription mySubscription; + private String mySubscriptionString; + private transient IBaseResource myPayload; + private String myPayoadString; + private String myPayloadId; private RestOperationTypeEnum myOperationType; public RestOperationTypeEnum getOperationType() { @@ -43,28 +47,45 @@ public class ResourceDeliveryMessage implements Serializable { myOperationType = theOperationType; } - public IIdType getPayloadId() { - return myPayloadId; + public IBaseResource getPayload(FhirContext theCtx) { + if (myPayload == null && myPayoadString != null) { + myPayload = theCtx.newJsonParser().parseResource(myPayoadString); + } + return myPayload; } - public void setPayloadId(IIdType thePayloadId) { - myPayloadId = thePayloadId; - } - - public IBaseResource getPayload() { - return myPayoad; - } - - public void setPayload(IBaseResource thePayload) { - myPayoad = thePayload; + public IIdType getPayloadId(FhirContext theCtx) { + IIdType retVal = null; + if (myPayloadId != null) { + retVal = theCtx.getVersion().newIdType().setValue(myPayloadId); + } + return retVal; } public CanonicalSubscription getSubscription() { + if (mySubscription == null && mySubscriptionString != null) { + mySubscription = new Gson().fromJson(mySubscriptionString, CanonicalSubscription.class); + } return mySubscription; } public void setSubscription(CanonicalSubscription theSubscription) { mySubscription = theSubscription; + if (mySubscription != null) { + mySubscriptionString = new Gson().toJson(mySubscription); + } + } + + public void setPayload(FhirContext theCtx, IBaseResource thePayload) { + myPayload = thePayload; + myPayoadString = theCtx.newJsonParser().encodeResourceToString(thePayload); + } + + public void setPayloadId(IIdType thePayloadId) { + myPayloadId = null; + if (thePayloadId != null) { + myPayloadId = thePayloadId.getValue(); + } } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java index 0f4ce3e194d..a933f7b94fa 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java @@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.subscription; * #L% */ +import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; @@ -28,21 +29,28 @@ import java.io.Serializable; public class ResourceModifiedMessage implements Serializable { - private static final long serialVersionUID = 0L; + private static final long serialVersionUID = 1L; - private IIdType myId; + private String myId; private RestOperationTypeEnum myOperationType; - private IBaseResource myNewPayload; + private String myNewPayloadEncoded; + private transient IBaseResource myNewPayload; - public IIdType getId() { - return myId; + public IIdType getId(FhirContext theCtx) { + IIdType retVal = null; + if (myId != null) { + retVal = theCtx.getVersion().newIdType().setValue(myId); + } + return retVal; } - public void setId(IIdType theId) { - myId = theId; + public IBaseResource getNewPayload(FhirContext theCtx) { + if (myNewPayload == null && myNewPayloadEncoded != null) { + myNewPayload = theCtx.newJsonParser().parseResource(myNewPayloadEncoded); + } + return myNewPayload; } - public RestOperationTypeEnum getOperationType() { return myOperationType; } @@ -51,11 +59,15 @@ public class ResourceModifiedMessage implements Serializable { myOperationType = theOperationType; } - public IBaseResource getNewPayload() { - return myNewPayload; + public void setId(IIdType theId) { + myId = null; + if (theId != null) { + myId = theId.getValue(); + } } - public void setNewPayload(IBaseResource theNewPayload) { + public void setNewPayload(FhirContext theCtx, IBaseResource theNewPayload) { myNewPayload = theNewPayload; + myNewPayloadEncoded = theCtx.newJsonParser().encodeResourceToString(theNewPayload); } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java index fd14b8219b7..8a37edbb3f7 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java @@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.subscription; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.dao.IFhirResourceDao; +import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IPrimitiveType; @@ -35,29 +36,30 @@ import org.springframework.messaging.MessagingException; import java.util.concurrent.ConcurrentHashMap; @SuppressWarnings("unchecked") -public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber { +public class SubscriptionActivatingSubscriber { + private final IFhirResourceDao mySubscriptionDao; + private final BaseSubscriptionInterceptor mySubscriptionInterceptor; private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class); + private FhirContext myCtx; + private Subscription.SubscriptionChannelType myChannelType; /** * Constructor */ public SubscriptionActivatingSubscriber(IFhirResourceDao theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) { - super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor); - } - - private void activateAndRegisterSubscriptionIfRequired(ResourceModifiedMessage theMsg) { - IBaseResource subscription = theMsg.getNewPayload(); - activateAndRegisterSubscriptionIfRequired(subscription); + mySubscriptionDao = theSubscriptionDao; + mySubscriptionInterceptor = theSubscriptionInterceptor; + myChannelType = theChannelType; + myCtx = theSubscriptionDao.getContext(); } public void activateAndRegisterSubscriptionIfRequired(IBaseResource theSubscription) { - boolean subscriptionTypeApplies = subscriptionTypeApplies(theSubscription); + boolean subscriptionTypeApplies = BaseSubscriptionSubscriber.subscriptionTypeApplies(myCtx, theSubscription, myChannelType); if (subscriptionTypeApplies == false) { return; } - FhirContext ctx = getSubscriptionDao().getContext(); - IPrimitiveType status = ctx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class); + IPrimitiveType status = myCtx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class); String statusString = status.getValueAsString(); String requestedStatus = Subscription.SubscriptionStatus.REQUESTED.toCode(); @@ -65,59 +67,37 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber if (requestedStatus.equals(statusString)) { status.setValueAsString(activeStatus); ourLog.info("Activating and registering subscription {} from status {} to {}", theSubscription.getIdElement().toUnqualified().getValue(), requestedStatus, activeStatus); - getSubscriptionDao().update(theSubscription); - getSubscriptionInterceptor().registerSubscription(theSubscription.getIdElement(), theSubscription); + mySubscriptionDao.update(theSubscription); + mySubscriptionInterceptor.registerSubscription(theSubscription.getIdElement(), theSubscription); } else if (activeStatus.equals(statusString)) { - if (!getSubscriptionInterceptor().hasSubscription(theSubscription.getIdElement())) { + if (!mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) { ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); } - getSubscriptionInterceptor().registerSubscription(theSubscription.getIdElement(), theSubscription); + mySubscriptionInterceptor.registerSubscription(theSubscription.getIdElement(), theSubscription); } else { - if (getSubscriptionInterceptor().hasSubscription(theSubscription.getIdElement())) { + if (mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) { ourLog.info("Removing {} subscription {}", statusString, theSubscription.getIdElement().toUnqualified().getValue()); } - getSubscriptionInterceptor().unregisterSubscription(theSubscription.getIdElement()); + mySubscriptionInterceptor.unregisterSubscription(theSubscription.getIdElement()); } } - private void handleCreate(ResourceModifiedMessage theMsg) { - if (!theMsg.getId().getResourceType().equals("Subscription")) { - return; - } + public void handleMessage(RestOperationTypeEnum theOperationType, IIdType theId, IBaseResource theSubscription) throws MessagingException { - activateAndRegisterSubscriptionIfRequired(theMsg); - } - - @Override - public void handleMessage(Message theMessage) throws MessagingException { - - if (!(theMessage.getPayload() instanceof ResourceModifiedMessage)) { - return; - } - - ResourceModifiedMessage msg = (ResourceModifiedMessage) theMessage.getPayload(); - IIdType id = msg.getId(); - - switch (msg.getOperationType()) { + switch (theOperationType) { case DELETE: - getSubscriptionInterceptor().unregisterSubscription(id); + mySubscriptionInterceptor.unregisterSubscription(theId); return; case CREATE: - handleCreate(msg); - break; case UPDATE: - handleUpdate(msg); + if (!theId.getResourceType().equals("Subscription")) { + return; + } + activateAndRegisterSubscriptionIfRequired(theSubscription); break; } } - private void handleUpdate(ResourceModifiedMessage theMsg) { - if (!theMsg.getId().getResourceType().equals("Subscription")) { - return; - } - - activateAndRegisterSubscriptionIfRequired(theMsg); - } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java index fea24c77a5a..f1be343b66e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java @@ -29,6 +29,7 @@ import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.RequestDetails; import org.apache.commons.lang3.StringUtils; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.r4.model.Subscription; import org.hl7.fhir.utilities.ucum.Canonical; @@ -68,8 +69,9 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber { return; } - String resourceType = msg.getId().getResourceType(); - String resourceId = msg.getId().getIdPart(); + IIdType id = msg.getId(getContext()); + String resourceType = id.getResourceType(); + String resourceId = id.getIdPart(); List subscriptions = getSubscriptionInterceptor().getSubscriptions(); @@ -77,7 +79,7 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber { for (CanonicalSubscription nextSubscription : subscriptions) { - String nextSubscriptionId = nextSubscription.getIdElement().toUnqualifiedVersionless().getValue(); + String nextSubscriptionId = nextSubscription.getIdElement(getContext()).toUnqualifiedVersionless().getValue(); String nextCriteriaString = nextSubscription.getCriteriaString(); if (StringUtils.isBlank(nextCriteriaString)) { @@ -115,10 +117,10 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber { ourLog.info("Found match: queueing rest-hook notification for resource: {}", nextBase.getIdElement()); ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage(); - deliveryMsg.setPayload(nextBase); + deliveryMsg.setPayload(getContext(), nextBase); deliveryMsg.setSubscription(nextSubscription); deliveryMsg.setOperationType(msg.getOperationType()); - deliveryMsg.setPayloadId(msg.getId()); + deliveryMsg.setPayloadId(msg.getId(getContext())); getSubscriptionInterceptor().getDeliveryChannel().send(new GenericMessage<>(deliveryMsg)); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionDeliveringRestHookSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionDeliveringRestHookSubscriber.java index fff5bdc58c4..472b80b8922 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionDeliveringRestHookSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionDeliveringRestHookSubscriber.java @@ -49,7 +49,7 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe } protected void deliverPayload(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, EncodingEnum thePayloadType, IGenericClient theClient) { - IBaseResource payloadResource = theMsg.getPayload(); + IBaseResource payloadResource = theMsg.getPayload(getContext()); IClientExecutable operation; switch (theMsg.getOperationType()) { @@ -60,7 +60,7 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe operation = theClient.update().resource(payloadResource); break; case DELETE: - operation = theClient.delete().resourceById(theMsg.getPayloadId()); + operation = theClient.delete().resourceById(theMsg.getPayloadId(getContext())); break; default: ourLog.warn("Ignoring delivery message of type: {}", theMsg.getOperationType()); @@ -69,7 +69,7 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe operation.encoded(thePayloadType); - ourLog.info("Delivering {} rest-hook payload {} for {}", theMsg.getOperationType(), payloadResource.getIdElement().toUnqualified().getValue(), theSubscription.getIdElement().toUnqualifiedVersionless().getValue()); + ourLog.info("Delivering {} rest-hook payload {} for {}", theMsg.getOperationType(), payloadResource.getIdElement().toUnqualified().getValue(), theSubscription.getIdElement(getContext()).toUnqualifiedVersionless().getValue()); operation.execute(); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketHandler.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketHandler.java index ad062504864..e56631c7a5a 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketHandler.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketHandler.java @@ -42,9 +42,10 @@ import java.util.Map; public class SubscriptionWebsocketHandler extends TextWebSocketHandler implements ISubscriptionWebsocketHandler { private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionWebsocketHandler.class); - private static FhirContext ourCtx; @Autowired private SubscriptionWebsocketInterceptor mySubscriptionWebsocketInterceptor; + @Autowired + private FhirContext myCtx; private IState myState = new InitialState(); @@ -118,7 +119,7 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement private void deliver() { try { - String payload = "ping " + mySubscription.getIdElement().getIdPart(); + String payload = "ping " + mySubscription.getIdElement(myCtx).getIdPart(); ourLog.info("Sending WebSocket message: {}", payload); mySession.sendMessage(new TextMessage(payload)); } catch (IOException e) { @@ -176,7 +177,7 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement try { Map idToSubscription = mySubscriptionWebsocketInterceptor.getIdToSubscription(); CanonicalSubscription subscription = idToSubscription.get(id.getIdPart()); - myState = new BoundStaticSubscipriptionState(theSession, subscription); + myState = new BoundStaticSubscipriptionState( theSession, subscription); } catch (ResourceNotFoundException e) { try { String message = "Invalid bind request - Unknown subscription: " + id.getValue(); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/ResourceProviderInterceptorDstu2Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/ResourceProviderInterceptorDstu2Test.java index fd25e30e56e..5c7f593b850 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/ResourceProviderInterceptorDstu2Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/ResourceProviderInterceptorDstu2Test.java @@ -171,7 +171,7 @@ public class ResourceProviderInterceptorDstu2Test extends BaseResourceProviderDs ardCaptor = ArgumentCaptor.forClass(ActionRequestDetails.class); opTypeCaptor = ArgumentCaptor.forClass(RestOperationTypeEnum.class); - verify(myDaoInterceptor, atLeast(3)).incomingRequestPreHandled(opTypeCaptor.capture(), ardCaptor.capture()); + verify(myDaoInterceptor, atLeast(2)).incomingRequestPreHandled(opTypeCaptor.capture(), ardCaptor.capture()); assertEquals(RestOperationTypeEnum.TRANSACTION, opTypeCaptor.getAllValues().get(0)); assertEquals("Bundle", ardCaptor.getAllValues().get(0).getResourceType()); assertNotNull(ardCaptor.getAllValues().get(0).getResource()); diff --git a/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/client/ReferenceClientTest.java b/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/client/ReferenceClientTest.java index 19fd0306918..50327945cd3 100644 --- a/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/client/ReferenceClientTest.java +++ b/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/client/ReferenceClientTest.java @@ -111,7 +111,7 @@ public class ReferenceClientTest { assertEquals(HttpGet.class, capt.getValue().getClass()); HttpGet get = (HttpGet) capt.getValue(); - assertEquals("http://foo/Patient?general-practitioner%3AOrganization=123", get.getURI().toString()); + assertEquals("http://foo/Patient?general-practitioner=Organization%2F123", get.getURI().toString()); } private String createBundle() {