diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SimpleJsonMessage.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseJsonMessage.java similarity index 79% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SimpleJsonMessage.java rename to hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseJsonMessage.java index eaee330ffb1..5c0c8d9b639 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SimpleJsonMessage.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseJsonMessage.java @@ -26,48 +26,27 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; -import java.io.Serializable; - @JsonInclude(JsonInclude.Include.NON_NULL) @JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) -public class SimpleJsonMessage implements Message, Serializable { +public abstract class BaseJsonMessage implements Message { private static final long serialVersionUID = 1L; - @JsonProperty("payload") - private T myPayload; @JsonProperty("headers") private MessageHeaders myHeaders; /** * Constructor */ - public SimpleJsonMessage(T thePayload) { - myPayload = thePayload; - } - - /** - * Constructor - */ - public SimpleJsonMessage() { + public BaseJsonMessage() { super(); } - public void setPayload(T thePayload) { - - myPayload = thePayload; - } - - public void setHeaders(MessageHeaders theHeaders) { - myHeaders = theHeaders; - } - - @Override - public T getPayload() { - return myPayload; - } - @Override public MessageHeaders getHeaders() { return myHeaders; } + + public void setHeaders(MessageHeaders theHeaders) { + myHeaders = theHeaders; + } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionDeliverySubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionDeliverySubscriber.java index 6a0bde62437..5799ca6ba60 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionDeliverySubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionDeliverySubscriber.java @@ -37,6 +37,7 @@ public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptio @Override public void handleMessage(Message theMessage) throws MessagingException { if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) { + ourLog.warn("Unexpected payload type: {}", theMessage.getPayload()); return; } try { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java index 5b1bf1cae45..3f259e6cf77 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java @@ -345,7 +345,7 @@ public abstract class BaseSubscriptionInterceptor exten submitResourceModified(msg); } - protected void sendToProcessingChannel(final SimpleJsonMessage theMessage) { + protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) { ourLog.trace("Registering synchronization to send resource modified message to processing channel"); /* @@ -355,7 +355,7 @@ public abstract class BaseSubscriptionInterceptor exten @Override public void afterCommit() { ourLog.trace("Sending resource modified message to processing channel"); - getProcessingChannel().send(theMessage); + getProcessingChannel().send(new ResourceModifiedJsonMessage(theMessage)); } }); } @@ -456,9 +456,8 @@ public abstract class BaseSubscriptionInterceptor exten } protected void submitResourceModified(final ResourceModifiedMessage theMsg) { - final SimpleJsonMessage message = new SimpleJsonMessage<>(theMsg); mySubscriptionActivatingSubscriber.handleMessage(theMsg.getOperationType(), theMsg.getId(myCtx), theMsg.getNewPayload(myCtx)); - sendToProcessingChannel(message); + sendToProcessingChannel(theMsg); } protected abstract void unregisterDeliverySubscriber(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryJsonMessage.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryJsonMessage.java new file mode 100644 index 00000000000..a5e274a512f --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryJsonMessage.java @@ -0,0 +1,53 @@ +package ca.uhn.fhir.jpa.subscription; + +/*- + * #%L + * HAPI FHIR JPA Server + * %% + * Copyright (C) 2014 - 2017 University Health Network + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) +public class ResourceDeliveryJsonMessage extends BaseJsonMessage { + + @JsonProperty("payload") + private ResourceDeliveryMessage myPayload; + + /** + * Constructor + */ + public ResourceDeliveryJsonMessage() { + super(); + } + + /** + * Constructor + */ + public ResourceDeliveryJsonMessage(ResourceDeliveryMessage thePayload) { + myPayload = thePayload; + } + + @Override + public ResourceDeliveryMessage getPayload() { + return myPayload; + } + + public void setPayload(ResourceDeliveryMessage thePayload) { + myPayload = thePayload; + } + +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedJsonMessage.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedJsonMessage.java new file mode 100644 index 00000000000..e75b29cf604 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedJsonMessage.java @@ -0,0 +1,53 @@ +package ca.uhn.fhir.jpa.subscription; + +/*- + * #%L + * HAPI FHIR JPA Server + * %% + * Copyright (C) 2014 - 2017 University Health Network + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) +public class ResourceModifiedJsonMessage extends BaseJsonMessage { + + @JsonProperty("headers") + private ResourceModifiedMessage myPayload; + + /** + * Constructor + */ + public ResourceModifiedJsonMessage() { + super(); + } + + /** + * Constructor + */ + public ResourceModifiedJsonMessage(ResourceModifiedMessage thePayload) { + myPayload = thePayload; + } + + @Override + public ResourceModifiedMessage getPayload() { + return myPayload; + } + + public void setPayload(ResourceModifiedMessage thePayload) { + myPayload = thePayload; + } + +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java index 50e3a77567a..1097a69231b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java @@ -29,11 +29,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; -import java.io.Serializable; - @JsonInclude(JsonInclude.Include.NON_NULL) @JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) -public class ResourceModifiedMessage implements Serializable { +public class ResourceModifiedMessage { private static final long serialVersionUID = 1L; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java index 4208165bda5..3822c6a5ec8 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java @@ -49,12 +49,12 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber { public void handleMessage(Message theMessage) throws MessagingException { ourLog.trace("Handling resource modified message: {}", theMessage); - if (!(theMessage.getPayload() instanceof ResourceModifiedMessage)) { - ourLog.warn("Unexpected message payload type: {}", theMessage.getPayload()); + if (!(theMessage instanceof ResourceModifiedJsonMessage)) { + ourLog.warn("Unexpected message payload type: {}", theMessage); return; } - ResourceModifiedMessage msg = (ResourceModifiedMessage) theMessage.getPayload(); + ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload(); switch (msg.getOperationType()) { case CREATE: case UPDATE: @@ -118,7 +118,8 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber { deliveryMsg.setOperationType(msg.getOperationType()); deliveryMsg.setPayloadId(msg.getId(getContext())); - getSubscriptionInterceptor().getDeliveryChannel().send(new SimpleJsonMessage<>(deliveryMsg)); + ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg); + getSubscriptionInterceptor().getDeliveryChannel().send(wrappedMsg); } }