Dont keep resource bodies in the queue for JPA subscription delivery
This commit is contained in:
parent
cffe4c9df2
commit
d07fd76d60
|
@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription;
|
|||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -27,12 +27,12 @@ import org.hl7.fhir.instance.model.api.IIdType;
|
|||
import org.hl7.fhir.r4.model.Subscription;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.lang.NonNullApi;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
|
||||
public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptionSubscriber {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriber.class);
|
||||
private boolean myReloadResourceBeforeDelivery = true;
|
||||
|
||||
public BaseSubscriptionDeliverySubscriber(IFhirResourceDao<?> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
|
||||
super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor);
|
||||
|
@ -60,25 +60,20 @@ public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptio
|
|||
msg.setSubscription(updatedSubscription);
|
||||
}
|
||||
|
||||
if (myReloadResourceBeforeDelivery) {
|
||||
// Reload the payload just in case any interceptors modified
|
||||
// it before it was saved to the database. This is also
|
||||
// useful for resources created in a transaction, since they
|
||||
// can have placeholder IDs in them.
|
||||
IIdType payloadId = msg.getPayloadId(getContext());
|
||||
Class type = getContext().getResourceDefinition(payloadId.getResourceType()).getImplementingClass();
|
||||
IFhirResourceDao dao = getSubscriptionInterceptor().getDao(type);
|
||||
IBaseResource loadedPayload;
|
||||
try {
|
||||
loadedPayload = dao.read(payloadId);
|
||||
} catch (ResourceNotFoundException e) {
|
||||
// This can happen if a last minute failure happens when saving a resource,
|
||||
// eg a constraint causes the transaction to roll back on commit
|
||||
ourLog.warn("Unable to find resource {} - Aborting delivery", payloadId.getValue());
|
||||
return;
|
||||
}
|
||||
msg.setPayload(getContext(), loadedPayload);
|
||||
// Load the resource
|
||||
IIdType payloadId = msg.getPayloadId(getContext());
|
||||
Class type = getContext().getResourceDefinition(payloadId.getResourceType()).getImplementingClass();
|
||||
IFhirResourceDao dao = getSubscriptionInterceptor().getDao(type);
|
||||
IBaseResource loadedPayload;
|
||||
try {
|
||||
loadedPayload = dao.read(payloadId);
|
||||
} catch (ResourceNotFoundException e) {
|
||||
// This can happen if a last minute failure happens when saving a resource,
|
||||
// eg a constraint causes the transaction to roll back on commit
|
||||
ourLog.warn("Unable to find resource {} - Aborting delivery", payloadId.getValue());
|
||||
return;
|
||||
}
|
||||
msg.setPayload(getContext(), loadedPayload);
|
||||
|
||||
handleMessage(msg);
|
||||
} catch (Exception e) {
|
||||
|
@ -90,8 +85,4 @@ public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptio
|
|||
|
||||
public abstract void handleMessage(ResourceDeliveryMessage theMessage) throws Exception;
|
||||
|
||||
public void setReloadResourceBeforeDelivery(boolean theReloadResourceBeforeDelivery) {
|
||||
myReloadResourceBeforeDelivery = theReloadResourceBeforeDelivery;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import ca.uhn.fhir.context.FhirContext;
|
|||
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
|
||||
import com.fasterxml.jackson.annotation.*;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
|
||||
|
@ -41,8 +42,6 @@ public class ResourceDeliveryMessage {
|
|||
private String mySubscriptionString;
|
||||
@JsonIgnore
|
||||
private transient IBaseResource myPayload;
|
||||
@JsonProperty("payload")
|
||||
private String myPayoadString;
|
||||
@JsonProperty("payloadId")
|
||||
private String myPayloadId;
|
||||
@JsonProperty("operationType")
|
||||
|
@ -57,9 +56,7 @@ public class ResourceDeliveryMessage {
|
|||
}
|
||||
|
||||
public IBaseResource getPayload(FhirContext theCtx) {
|
||||
if (myPayload == null && myPayoadString != null) {
|
||||
myPayload = theCtx.newJsonParser().parseResource(myPayoadString);
|
||||
}
|
||||
Validate.notNull(myPayload);
|
||||
return myPayload;
|
||||
}
|
||||
|
||||
|
@ -87,7 +84,6 @@ public class ResourceDeliveryMessage {
|
|||
|
||||
public void setPayload(FhirContext theCtx, IBaseResource thePayload) {
|
||||
myPayload = thePayload;
|
||||
myPayoadString = theCtx.newJsonParser().encodeResourceToString(thePayload);
|
||||
}
|
||||
|
||||
public void setPayloadId(IIdType thePayloadId) {
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -1431,7 +1431,7 @@
|
|||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>versions-maven-plugin</artifactId>
|
||||
<version>2.5</version>
|
||||
<version>2.6-SNAPSHOT</version>
|
||||
<configuration>
|
||||
<processDependencyManagementTransitive>false</processDependencyManagementTransitive>
|
||||
</configuration>
|
||||
|
|
|
@ -244,6 +244,11 @@
|
|||
a resource could be associated with the wrong entry in the response.
|
||||
Thanks to GitHub user @jbalbien for the pull request!
|
||||
</action>
|
||||
<action type="add">
|
||||
JPA subscription delivery queues no longer store the resource body in the
|
||||
queue (only the ID), which should reduce the memory/disk footprint of the queue
|
||||
when it grows long.
|
||||
</action>
|
||||
</release>
|
||||
<release version="3.4.0" date="2018-05-28">
|
||||
<action type="add">
|
||||
|
|
Loading…
Reference in New Issue