Create model for subscription messages

This commit is contained in:
James Agnew 2017-09-23 10:14:50 -04:00
parent c115fbfd15
commit 478dc1d507
7 changed files with 122 additions and 38 deletions

View File

@ -26,48 +26,27 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessageHeaders;
import java.io.Serializable;
@JsonInclude(JsonInclude.Include.NON_NULL) @JsonInclude(JsonInclude.Include.NON_NULL)
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) @JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class SimpleJsonMessage<T> implements Message<T>, Serializable { public abstract class BaseJsonMessage<T> implements Message<T> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@JsonProperty("payload")
private T myPayload;
@JsonProperty("headers") @JsonProperty("headers")
private MessageHeaders myHeaders; private MessageHeaders myHeaders;
/** /**
* Constructor * Constructor
*/ */
public SimpleJsonMessage(T thePayload) { public BaseJsonMessage() {
myPayload = thePayload;
}
/**
* Constructor
*/
public SimpleJsonMessage() {
super(); super();
} }
public void setPayload(T thePayload) {
myPayload = thePayload;
}
public void setHeaders(MessageHeaders theHeaders) {
myHeaders = theHeaders;
}
@Override
public T getPayload() {
return myPayload;
}
@Override @Override
public MessageHeaders getHeaders() { public MessageHeaders getHeaders() {
return myHeaders; return myHeaders;
} }
public void setHeaders(MessageHeaders theHeaders) {
myHeaders = theHeaders;
}
} }

View File

@ -37,6 +37,7 @@ public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptio
@Override @Override
public void handleMessage(Message<?> theMessage) throws MessagingException { public void handleMessage(Message<?> theMessage) throws MessagingException {
if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) { if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) {
ourLog.warn("Unexpected payload type: {}", theMessage.getPayload());
return; return;
} }
try { try {

View File

@ -345,7 +345,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
submitResourceModified(msg); submitResourceModified(msg);
} }
protected void sendToProcessingChannel(final SimpleJsonMessage<ResourceModifiedMessage> theMessage) { protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
ourLog.trace("Registering synchronization to send resource modified message to processing channel"); ourLog.trace("Registering synchronization to send resource modified message to processing channel");
/* /*
@ -355,7 +355,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
@Override @Override
public void afterCommit() { public void afterCommit() {
ourLog.trace("Sending resource modified message to processing channel"); ourLog.trace("Sending resource modified message to processing channel");
getProcessingChannel().send(theMessage); getProcessingChannel().send(new ResourceModifiedJsonMessage(theMessage));
} }
}); });
} }
@ -456,9 +456,8 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
} }
protected void submitResourceModified(final ResourceModifiedMessage theMsg) { protected void submitResourceModified(final ResourceModifiedMessage theMsg) {
final SimpleJsonMessage<ResourceModifiedMessage> message = new SimpleJsonMessage<>(theMsg);
mySubscriptionActivatingSubscriber.handleMessage(theMsg.getOperationType(), theMsg.getId(myCtx), theMsg.getNewPayload(myCtx)); mySubscriptionActivatingSubscriber.handleMessage(theMsg.getOperationType(), theMsg.getId(myCtx), theMsg.getNewPayload(myCtx));
sendToProcessingChannel(message); sendToProcessingChannel(theMsg);
} }
protected abstract void unregisterDeliverySubscriber(); protected abstract void unregisterDeliverySubscriber();

View File

@ -0,0 +1,53 @@
package ca.uhn.fhir.jpa.subscription;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2017 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class ResourceDeliveryJsonMessage extends BaseJsonMessage<ResourceDeliveryMessage> {
@JsonProperty("payload")
private ResourceDeliveryMessage myPayload;
/**
* Constructor
*/
public ResourceDeliveryJsonMessage() {
super();
}
/**
* Constructor
*/
public ResourceDeliveryJsonMessage(ResourceDeliveryMessage thePayload) {
myPayload = thePayload;
}
@Override
public ResourceDeliveryMessage getPayload() {
return myPayload;
}
public void setPayload(ResourceDeliveryMessage thePayload) {
myPayload = thePayload;
}
}

View File

@ -0,0 +1,53 @@
package ca.uhn.fhir.jpa.subscription;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2017 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class ResourceModifiedJsonMessage extends BaseJsonMessage<ResourceModifiedMessage> {
@JsonProperty("headers")
private ResourceModifiedMessage myPayload;
/**
* Constructor
*/
public ResourceModifiedJsonMessage() {
super();
}
/**
* Constructor
*/
public ResourceModifiedJsonMessage(ResourceModifiedMessage thePayload) {
myPayload = thePayload;
}
@Override
public ResourceModifiedMessage getPayload() {
return myPayload;
}
public void setPayload(ResourceModifiedMessage thePayload) {
myPayload = thePayload;
}
}

View File

@ -29,11 +29,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import java.io.Serializable;
@JsonInclude(JsonInclude.Include.NON_NULL) @JsonInclude(JsonInclude.Include.NON_NULL)
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) @JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class ResourceModifiedMessage implements Serializable { public class ResourceModifiedMessage {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -49,12 +49,12 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
public void handleMessage(Message<?> theMessage) throws MessagingException { public void handleMessage(Message<?> theMessage) throws MessagingException {
ourLog.trace("Handling resource modified message: {}", theMessage); ourLog.trace("Handling resource modified message: {}", theMessage);
if (!(theMessage.getPayload() instanceof ResourceModifiedMessage)) { if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
ourLog.warn("Unexpected message payload type: {}", theMessage.getPayload()); ourLog.warn("Unexpected message payload type: {}", theMessage);
return; return;
} }
ResourceModifiedMessage msg = (ResourceModifiedMessage) theMessage.getPayload(); ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
switch (msg.getOperationType()) { switch (msg.getOperationType()) {
case CREATE: case CREATE:
case UPDATE: case UPDATE:
@ -118,7 +118,8 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
deliveryMsg.setOperationType(msg.getOperationType()); deliveryMsg.setOperationType(msg.getOperationType());
deliveryMsg.setPayloadId(msg.getId(getContext())); deliveryMsg.setPayloadId(msg.getId(getContext()));
getSubscriptionInterceptor().getDeliveryChannel().send(new SimpleJsonMessage<>(deliveryMsg)); ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg);
getSubscriptionInterceptor().getDeliveryChannel().send(wrappedMsg);
} }
} }