Add serializable message type
This commit is contained in:
parent
be07ebc4ef
commit
c115fbfd15
|
@ -50,7 +50,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
|
|||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.ExecutorSubscribableChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
@ -346,6 +345,21 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
|
|||
submitResourceModified(msg);
|
||||
}
|
||||
|
||||
protected void sendToProcessingChannel(final SimpleJsonMessage<ResourceModifiedMessage> theMessage) {
|
||||
ourLog.trace("Registering synchronization to send resource modified message to processing channel");
|
||||
|
||||
/*
|
||||
* We only actually submit this item work working after the
|
||||
*/
|
||||
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
ourLog.trace("Sending resource modified message to processing channel");
|
||||
getProcessingChannel().send(theMessage);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void setFhirContext(FhirContext theCtx) {
|
||||
myCtx = theCtx;
|
||||
}
|
||||
|
@ -442,26 +456,11 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
|
|||
}
|
||||
|
||||
protected void submitResourceModified(final ResourceModifiedMessage theMsg) {
|
||||
final GenericMessage<ResourceModifiedMessage> message = new GenericMessage<>(theMsg);
|
||||
final SimpleJsonMessage<ResourceModifiedMessage> message = new SimpleJsonMessage<>(theMsg);
|
||||
mySubscriptionActivatingSubscriber.handleMessage(theMsg.getOperationType(), theMsg.getId(myCtx), theMsg.getNewPayload(myCtx));
|
||||
sendToProcessingChannel(message);
|
||||
}
|
||||
|
||||
protected void sendToProcessingChannel(final GenericMessage<ResourceModifiedMessage> theMessage) {
|
||||
ourLog.trace("Registering synchronization to send resource modified message to processing channel");
|
||||
|
||||
/*
|
||||
* We only actually submit this item work working after the
|
||||
*/
|
||||
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
ourLog.trace("Sending resource modified message to processing channel");
|
||||
getProcessingChannel().send(theMessage);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected abstract void unregisterDeliverySubscriber();
|
||||
|
||||
public void unregisterSubscription(IIdType theId) {
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
package ca.uhn.fhir.jpa.subscription;
|
||||
|
||||
/*-
|
||||
* #%L
|
||||
* HAPI FHIR JPA Server
|
||||
* %%
|
||||
* Copyright (C) 2014 - 2017 University Health Network
|
||||
* %%
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* #L%
|
||||
*/
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
|
||||
public class SimpleJsonMessage<T> implements Message<T>, Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
@JsonProperty("payload")
|
||||
private T myPayload;
|
||||
@JsonProperty("headers")
|
||||
private MessageHeaders myHeaders;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public SimpleJsonMessage(T thePayload) {
|
||||
myPayload = thePayload;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public SimpleJsonMessage() {
|
||||
super();
|
||||
}
|
||||
|
||||
public void setPayload(T thePayload) {
|
||||
|
||||
myPayload = thePayload;
|
||||
}
|
||||
|
||||
public void setHeaders(MessageHeaders theHeaders) {
|
||||
myHeaders = theHeaders;
|
||||
}
|
||||
|
||||
@Override
|
||||
public T getPayload() {
|
||||
return myPayload;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageHeaders getHeaders() {
|
||||
return myHeaders;
|
||||
}
|
||||
}
|
|
@ -30,16 +30,12 @@ import ca.uhn.fhir.rest.api.server.RequestDetails;
|
|||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.instance.model.api.IPrimitiveType;
|
||||
import org.hl7.fhir.r4.model.Subscription;
|
||||
import org.hl7.fhir.utilities.ucum.Canonical;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
|
||||
|
@ -122,7 +118,7 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
|
|||
deliveryMsg.setOperationType(msg.getOperationType());
|
||||
deliveryMsg.setPayloadId(msg.getId(getContext()));
|
||||
|
||||
getSubscriptionInterceptor().getDeliveryChannel().send(new GenericMessage<>(deliveryMsg));
|
||||
getSubscriptionInterceptor().getDeliveryChannel().send(new SimpleJsonMessage<>(deliveryMsg));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue