diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionDeliverySubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionDeliverySubscriber.java index 800d88b1b48..cec5fb5134f 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionDeliverySubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionDeliverySubscriber.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -27,12 +27,12 @@ import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.lang.NonNullApi; import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptionSubscriber { private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriber.class); - private boolean myReloadResourceBeforeDelivery = true; public BaseSubscriptionDeliverySubscriber(IFhirResourceDao theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) { super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor); @@ -60,25 +60,20 @@ public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptio msg.setSubscription(updatedSubscription); } - if (myReloadResourceBeforeDelivery) { - // Reload the payload just in case any interceptors modified - // it before it was saved to the database. This is also - // useful for resources created in a transaction, since they - // can have placeholder IDs in them. - IIdType payloadId = msg.getPayloadId(getContext()); - Class type = getContext().getResourceDefinition(payloadId.getResourceType()).getImplementingClass(); - IFhirResourceDao dao = getSubscriptionInterceptor().getDao(type); - IBaseResource loadedPayload; - try { - loadedPayload = dao.read(payloadId); - } catch (ResourceNotFoundException e) { - // This can happen if a last minute failure happens when saving a resource, - // eg a constraint causes the transaction to roll back on commit - ourLog.warn("Unable to find resource {} - Aborting delivery", payloadId.getValue()); - return; - } - msg.setPayload(getContext(), loadedPayload); + // Load the resource + IIdType payloadId = msg.getPayloadId(getContext()); + Class type = getContext().getResourceDefinition(payloadId.getResourceType()).getImplementingClass(); + IFhirResourceDao dao = getSubscriptionInterceptor().getDao(type); + IBaseResource loadedPayload; + try { + loadedPayload = dao.read(payloadId); + } catch (ResourceNotFoundException e) { + // This can happen if a last minute failure happens when saving a resource, + // eg a constraint causes the transaction to roll back on commit + ourLog.warn("Unable to find resource {} - Aborting delivery", payloadId.getValue()); + return; } + msg.setPayload(getContext(), loadedPayload); handleMessage(msg); } catch (Exception e) { @@ -90,8 +85,4 @@ public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptio public abstract void handleMessage(ResourceDeliveryMessage theMessage) throws Exception; - public void setReloadResourceBeforeDelivery(boolean theReloadResourceBeforeDelivery) { - myReloadResourceBeforeDelivery = theReloadResourceBeforeDelivery; - } - } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java index bec9191f5f0..5034c26ed9a 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java @@ -24,6 +24,7 @@ import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import com.fasterxml.jackson.annotation.*; import com.google.gson.Gson; +import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; @@ -41,8 +42,6 @@ public class ResourceDeliveryMessage { private String mySubscriptionString; @JsonIgnore private transient IBaseResource myPayload; - @JsonProperty("payload") - private String myPayoadString; @JsonProperty("payloadId") private String myPayloadId; @JsonProperty("operationType") @@ -57,9 +56,7 @@ public class ResourceDeliveryMessage { } public IBaseResource getPayload(FhirContext theCtx) { - if (myPayload == null && myPayoadString != null) { - myPayload = theCtx.newJsonParser().parseResource(myPayoadString); - } + Validate.notNull(myPayload); return myPayload; } @@ -87,7 +84,6 @@ public class ResourceDeliveryMessage { public void setPayload(FhirContext theCtx, IBaseResource thePayload) { myPayload = thePayload; - myPayoadString = theCtx.newJsonParser().encodeResourceToString(thePayload); } public void setPayloadId(IIdType thePayloadId) { diff --git a/pom.xml b/pom.xml index 9de78fa9f31..5792e8842b5 100644 --- a/pom.xml +++ b/pom.xml @@ -1431,7 +1431,7 @@ org.codehaus.mojo versions-maven-plugin - 2.5 + 2.6-SNAPSHOT false diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 076c8ee11e5..a97c12ab28f 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -244,6 +244,11 @@ a resource could be associated with the wrong entry in the response. Thanks to GitHub user @jbalbien for the pull request! + + JPA subscription delivery queues no longer store the resource body in the + queue (only the ID), which should reduce the memory/disk footprint of the queue + when it grows long. +