Added content message delivery for SubscriptionWebsocketHandler (#5692)

* Added content message delivery for SubscriptionWebsocketHandler

* 5687: Code clean up and small bug fix on empty payload

* 5687: Provided unit tests for the SubscriptionWebsocketHandler with subscription topic content: id-only, empty and full-resource

* 5687: Apply mvn spotless:apply

* 5687: Code formatting

* Credit for #5692

---------

Co-authored-by: artiom.darie <artiom.darie@adswizz.com>
Co-authored-by: Artiom Darie <5781864-artiom.darie@users.noreply.gitlab.com>
Co-authored-by: James Agnew <jamesagnew@gmail.com>
This commit is contained in:
Artiom Darie 2024-02-26 15:54:04 +02:00 committed by GitHub
parent 2c2afd4a83
commit d8f6c10df2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 239 additions and 10 deletions

View File

@ -0,0 +1,5 @@
---
type: add
issue: 5692
title: "The JPA WebSocket delivery mechanism now supports the `content` delivery mode.
Thanks to Artiom Darie for the contribution!"

View File

@ -41,9 +41,10 @@ import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler; import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException; import java.io.IOException;
import java.util.Optional;
public class SubscriptionWebsocketHandler extends TextWebSocketHandler implements WebSocketHandler { public class SubscriptionWebsocketHandler extends TextWebSocketHandler implements WebSocketHandler {
private static Logger ourLog = LoggerFactory.getLogger(SubscriptionWebsocketHandler.class); private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionWebsocketHandler.class);
@Autowired @Autowired
protected WebsocketConnectionValidator myWebsocketConnectionValidator; protected WebsocketConnectionValidator myWebsocketConnectionValidator;
@ -51,6 +52,8 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
@Autowired @Autowired
SubscriptionChannelRegistry mySubscriptionChannelRegistry; SubscriptionChannelRegistry mySubscriptionChannelRegistry;
private IState myState = new InitialState();
/** /**
* Constructor * Constructor
*/ */
@ -58,8 +61,6 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
super(); super();
} }
private IState myState = new InitialState();
@Override @Override
public void afterConnectionClosed(WebSocketSession theSession, CloseStatus theStatus) throws Exception { public void afterConnectionClosed(WebSocketSession theSession, CloseStatus theStatus) throws Exception {
super.afterConnectionClosed(theSession, theStatus); super.afterConnectionClosed(theSession, theStatus);
@ -130,10 +131,17 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
subscriptionChannelWithHandlers.removeHandler(this); subscriptionChannelWithHandlers.removeHandler(this);
} }
private void deliver() { /**
* Send the payload to the client
*
* @param payload The payload
*/
private void deliver(String payload) {
try { try {
String payload = "ping " + myActiveSubscription.getId(); // Log it
ourLog.info("Sending WebSocket message: {}", payload); ourLog.info("Sending WebSocket message: {}", payload);
// Send message
mySession.sendMessage(new TextMessage(payload)); mySession.sendMessage(new TextMessage(payload));
} catch (IOException e) { } catch (IOException e) {
handleFailure(e); handleFailure(e);
@ -145,15 +153,68 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) { if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) {
return; return;
} }
try { try {
ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
if (myActiveSubscription.getSubscription().equals(msg.getSubscription())) { handleSubscriptionPayload(msg);
deliver();
}
} catch (Exception e) { } catch (Exception e) {
handleException(theMessage, e);
}
}
/**
* Handle the subscription payload
*
* @param msg The message
*/
private void handleSubscriptionPayload(ResourceDeliveryMessage msg) {
// Check if the subscription exists and is the same as the active subscription
if (!myActiveSubscription.getSubscription().equals(msg.getSubscription())) {
return;
}
// Default payload
String defaultPayload = "ping " + myActiveSubscription.getId();
String payload = defaultPayload;
// Check if the subscription is a topic subscription
if (msg.getSubscription().isTopicSubscription()) {
// Get the payload by content
payload = getPayloadByContent(msg).orElse(defaultPayload);
}
// Deliver the payload
deliver(payload);
}
/**
* Handle the exception
*
* @param theMessage The message
* @param e The exception
*/
private void handleException(Message<?> theMessage, Exception e) {
ourLog.error("Failure handling subscription payload", e); ourLog.error("Failure handling subscription payload", e);
throw new MessagingException(theMessage, Msg.code(6) + "Failure handling subscription payload", e); throw new MessagingException(theMessage, Msg.code(6) + "Failure handling subscription payload", e);
} }
/**
* Get the payload based on the subscription content
*
* @param msg The message
* @return The payload
*/
private Optional<String> getPayloadByContent(ResourceDeliveryMessage msg) {
switch (msg.getSubscription().getContent()) {
case IDONLY:
return Optional.of(msg.getPayloadId());
case FULLRESOURCE:
return Optional.of(msg.getPayloadString());
case EMPTY:
case NULL:
default:
return Optional.empty();
}
} }
@Override @Override

View File

@ -0,0 +1,159 @@
package ca.uhn.fhir.jpa.subscription.websocket;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR5Test;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.util.WebsocketSubscriptionClient;
import ca.uhn.fhir.rest.api.MethodOutcome;
import org.hl7.fhir.r5.model.Enumerations;
import org.hl7.fhir.r5.model.Patient;
import org.hl7.fhir.r5.model.Subscription;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.UUID;
import static org.awaitility.Awaitility.await;
/**
* Test {@link ca.uhn.fhir.jpa.subscription.match.deliver.websocket.SubscriptionWebsocketHandler} with different content types.
*/
public class WebsocketWithSubscriptionIdR5Test extends BaseSubscriptionsR5Test {
private static final Logger ourLog = org.slf4j.LoggerFactory.getLogger(WebsocketWithSubscriptionIdR5Test.class);
@RegisterExtension
private final WebsocketSubscriptionClient myWebsocketClientExtension =
new WebsocketSubscriptionClient(() -> myServer, () -> myStorageSettings);
@Autowired
private SubscriptionTestUtil mySubscriptionTestUtil;
@Override
@BeforeEach
public void before() {
// Register interceptor
mySubscriptionTestUtil.registerWebSocketInterceptor();
mySubscriptionTestUtil.registerSubscriptionLoggingInterceptor();
// Given a subscription topic
SubscriptionTopic subscriptionTopic = new SubscriptionTopic();
subscriptionTopic.setUrl("Topic/123");
subscriptionTopic.setStatus(Enumerations.PublicationStatus.ACTIVE);
SubscriptionTopic.SubscriptionTopicResourceTriggerComponent trigger = subscriptionTopic.addResourceTrigger();
trigger.setResource("Patient");
trigger.addSupportedInteraction(SubscriptionTopic.InteractionTrigger.CREATE);
myClient.create().resource(subscriptionTopic).execute();
}
@Override
@AfterEach
public void after() throws Exception {
// Unregister interceptor
mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
myWebsocketClientExtension.afterEach(null);
}
@Test
public void testSubscriptionMessagePayloadContentIsEmpty() {
// Given a subscription
Subscription subscription = new Subscription();
subscription.setStatus(Enumerations.SubscriptionStatusCodes.ACTIVE);
subscription.setContent(Subscription.SubscriptionPayloadContent.fromCode("empty"));
subscription.setTopic("Topic/123");
subscription.getChannelType().setCode("websocket");
MethodOutcome methodOutcome = myClient.create().resource(subscription).execute();
String subscriptionId = methodOutcome.getId().getIdPart();
// When
myWebsocketClientExtension.bind(subscriptionId);
// And
// Trigger resource creation
Patient patient = new Patient();
patient.setActive(true);
myClient.create().resource(patient).execute();
// Then
List<String> messages = myWebsocketClientExtension.getMessages();
await().until(() -> !messages.isEmpty());
// Log it
ourLog.info("Messages: {}", messages);
// Verify a ping message shall be returned
Assertions.assertTrue(messages.contains("ping " + subscriptionId));
}
@Test
public void testSubscriptionMessagePayloadContentIsIdOnly() {
// Given a subscription
Subscription subscription = new Subscription();
subscription.setStatus(Enumerations.SubscriptionStatusCodes.ACTIVE);
subscription.setContent(Subscription.SubscriptionPayloadContent.fromCode("id-only"));
subscription.setTopic("Topic/123");
subscription.getChannelType().setCode("websocket");
MethodOutcome methodOutcome = myClient.create().resource(subscription).execute();
String subscriptionId = methodOutcome.getId().getIdPart();
// When
myWebsocketClientExtension.bind(subscriptionId);
// And
// Trigger resource creation
Patient patient = new Patient();
patient.setActive(true);
myClient.create().resource(patient).execute();
// Then
List<String> messages = myWebsocketClientExtension.getMessages();
await().until(() -> messages.size() > 1);
// Log it
ourLog.info("Messages: {}", messages);
// Verify UUID shall be returned
Assertions.assertTrue(messages.contains("bound " + subscriptionId));
Assertions.assertNotNull(UUID.fromString(messages.get(1)));
}
@Test
public void testSubscriptionMessagePayloadContentIsFullResource() {
// Given a subscription
Subscription subscription = new Subscription();
subscription.setStatus(Enumerations.SubscriptionStatusCodes.ACTIVE);
subscription.setContent(Subscription.SubscriptionPayloadContent.fromCode("full-resource"));
subscription.setTopic("Topic/123");
subscription.getChannelType().setCode("websocket");
MethodOutcome methodOutcome = myClient.create().resource(subscription).execute();
String subscriptionId = methodOutcome.getId().getIdPart();
// When
myWebsocketClientExtension.bind(subscriptionId);
// And
// Trigger resource creation
Patient patient = new Patient();
patient.setActive(true);
myClient.create().resource(patient).execute();
// Then
List<String> messages = myWebsocketClientExtension.getMessages();
await().until(() -> messages.size() > 1);
// Log it
ourLog.info("Messages: {}", messages);
// Verify Bundle resource shall be returned
Assertions.assertTrue(messages.contains("bound " + subscriptionId));
Assertions.assertNotNull(myFhirContext.newJsonParser().parseResource(messages.get(1)));
}
}

View File

@ -892,6 +892,10 @@
<name>Max Bureck</name> <name>Max Bureck</name>
<organization>Fraunhofer FOKUS</organization> <organization>Fraunhofer FOKUS</organization>
</developer> </developer>
<developer>
<id>ArtyomyuS</id>
<name>Artiom Darie</name>
</developer>
<developer> <developer>
<id>pano-smals</id> <id>pano-smals</id>
<name>pano-smals</name> <name>pano-smals</name>