Websocket cleanup (#1275)

* fix bug in websocket subscription (It wasn't destroying the channel when there are no subscribers)

* add support for removing channel.  Also synchronize removal (there was a race condition between sync and queue)

* keep deprecated method for backwards compatibility

* make websocket endpoint configurable

* make websocket context path configurable

* make websocket context path configurable

* trying mvn clean test instead of mvn clean install to see if the build goes faster

* that didn't work at all.  reverting.

* change log
This commit is contained in:
Ken Stevens 2019-04-17 15:03:35 -04:00 committed by GitHub
parent f6335ebd83
commit a57e50317d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 92 additions and 29 deletions

0
.travis.yml Normal file → Executable file
View File

View File

@ -20,8 +20,10 @@ package ca.uhn.fhir.jpa.config;
* #L% * #L%
*/ */
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.subscription.module.subscriber.websocket.SubscriptionWebsocketHandler; import ca.uhn.fhir.jpa.subscription.module.subscriber.websocket.SubscriptionWebsocketHandler;
import org.springframework.beans.factory.annotation.Autowire; import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
@ -35,10 +37,12 @@ import org.springframework.web.socket.handler.PerConnectionWebSocketHandler;
@EnableWebSocket() @EnableWebSocket()
@Controller @Controller
public class WebsocketDispatcherConfig implements WebSocketConfigurer { public class WebsocketDispatcherConfig implements WebSocketConfigurer {
@Autowired
ModelConfig myModelConfig;
@Override @Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry theRegistry) { public void registerWebSocketHandlers(WebSocketHandlerRegistry theRegistry) {
theRegistry.addHandler(subscriptionWebSocketHandler(), "/websocket").setAllowedOrigins("*"); theRegistry.addHandler(subscriptionWebSocketHandler(), myModelConfig.getWebsocketContextPath()).setAllowedOrigins("*");
} }
@Bean(autowire = Autowire.BY_TYPE) @Bean(autowire = Autowire.BY_TYPE)

View File

@ -1580,6 +1580,21 @@ public class DaoConfig {
myModelConfig.setEmailFromAddress(theEmailFromAddress); myModelConfig.setEmailFromAddress(theEmailFromAddress);
} }
/**
* If websocket subscriptions are enabled, this defines the context path that listens to them. Default value "/websocket".
*/
public String getWebsocketContextPath() {
return myModelConfig.getWebsocketContextPath();
}
/**
* If websocket subscriptions are enabled, this defines the context path that listens to them. Default value "/websocket".
*/
public void setWebsocketContextPath(String theWebsocketContextPath) {
myModelConfig.setWebsocketContextPath(theWebsocketContextPath);
}
public enum IndexEnabledEnum { public enum IndexEnabledEnum {
ENABLED, ENABLED,

View File

@ -27,8 +27,6 @@ public class SubscriptionsDstu3Test extends BaseResourceProviderDstu3Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionsDstu3Test.class); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionsDstu3Test.class);
private static final String WEBSOCKET_PATH = "/websocket/dstu3";
@Override @Override
public void beforeCreateInterceptor() { public void beforeCreateInterceptor() {
super.beforeCreateInterceptor(); super.beforeCreateInterceptor();

View File

@ -27,8 +27,6 @@ public class SubscriptionsR4Test extends BaseResourceProviderR4Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionsR4Test.class); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionsR4Test.class);
private static final String WEBSOCKET_PATH = "/websocket/r4";
@Override @Override
public void beforeCreateInterceptor() { public void beforeCreateInterceptor() {
super.beforeCreateInterceptor(); super.beforeCreateInterceptor();

View File

@ -109,7 +109,7 @@ public class WebsocketWithSubscriptionIdDstu2Test extends BaseResourceProviderDs
mySocketImplementation = new SocketImplementation(mySubscriptionId, EncodingEnum.JSON); mySocketImplementation = new SocketImplementation(mySubscriptionId, EncodingEnum.JSON);
myWebSocketClient.start(); myWebSocketClient.start();
URI echoUri = new URI("ws://localhost:" + ourPort + "/websocket"); URI echoUri = new URI("ws://localhost:" + ourPort + myModelConfig.getWebsocketContextPath());
ClientUpgradeRequest request = new ClientUpgradeRequest(); ClientUpgradeRequest request = new ClientUpgradeRequest();
ourLog.info("Connecting to : {}", echoUri); ourLog.info("Connecting to : {}", echoUri);
Future<Session> connection = myWebSocketClient.connect(mySocketImplementation, echoUri, request); Future<Session> connection = myWebSocketClient.connect(mySocketImplementation, echoUri, request);

View File

@ -107,7 +107,7 @@ public class WebsocketWithSubscriptionIdDstu3Test extends BaseResourceProviderDs
mySocketImplementation = new SocketImplementation(mySubscriptionId, EncodingEnum.JSON); mySocketImplementation = new SocketImplementation(mySubscriptionId, EncodingEnum.JSON);
myWebSocketClient.start(); myWebSocketClient.start();
URI echoUri = new URI("ws://localhost:" + ourPort + "/websocket"); URI echoUri = new URI("ws://localhost:" + ourPort + myModelConfig.getWebsocketContextPath());
ClientUpgradeRequest request = new ClientUpgradeRequest(); ClientUpgradeRequest request = new ClientUpgradeRequest();
ourLog.info("Connecting to : {}", echoUri); ourLog.info("Connecting to : {}", echoUri);
Future<Session> connection = myWebSocketClient.connect(mySocketImplementation, echoUri, request); Future<Session> connection = myWebSocketClient.connect(mySocketImplementation, echoUri, request);

View File

@ -105,7 +105,7 @@ public class WebsocketWithSubscriptionIdR4Test extends BaseResourceProviderR4Tes
mySocketImplementation = new SocketImplementation(mySubscriptionId, EncodingEnum.JSON); mySocketImplementation = new SocketImplementation(mySubscriptionId, EncodingEnum.JSON);
myWebSocketClient.start(); myWebSocketClient.start();
URI echoUri = new URI("ws://localhost:" + ourPort + "/websocket"); URI echoUri = new URI("ws://localhost:" + ourPort + myModelConfig.getWebsocketContextPath());
ClientUpgradeRequest request = new ClientUpgradeRequest(); ClientUpgradeRequest request = new ClientUpgradeRequest();
ourLog.info("Connecting to : {}", echoUri); ourLog.info("Connecting to : {}", echoUri);
Future<Session> connection = myWebSocketClient.connect(mySocketImplementation, echoUri, request); Future<Session> connection = myWebSocketClient.connect(mySocketImplementation, echoUri, request);

View File

@ -47,6 +47,7 @@ public class ModelConfig {
"http://hl7.org/fhir/codesystem-*", "http://hl7.org/fhir/codesystem-*",
"http://hl7.org/fhir/StructureDefinition/*"))); "http://hl7.org/fhir/StructureDefinition/*")));
public static final String DEFAULT_WEBSOCKET_CONTEXT_PATH = "/websocket";
/** /**
* update setter javadoc if default changes * update setter javadoc if default changes
*/ */
@ -58,6 +59,7 @@ public class ModelConfig {
private Set<Subscription.SubscriptionChannelType> mySupportedSubscriptionTypes = new HashSet<>(); private Set<Subscription.SubscriptionChannelType> mySupportedSubscriptionTypes = new HashSet<>();
private String myEmailFromAddress = "noreply@unknown.com"; private String myEmailFromAddress = "noreply@unknown.com";
private boolean mySubscriptionMatchingEnabled = true; private boolean mySubscriptionMatchingEnabled = true;
private String myWebsocketContextPath = DEFAULT_WEBSOCKET_CONTEXT_PATH;
/** /**
* If set to {@code true} the default search params (i.e. the search parameters that are * If set to {@code true} the default search params (i.e. the search parameters that are
@ -363,6 +365,22 @@ public class ModelConfig {
myEmailFromAddress = theEmailFromAddress; myEmailFromAddress = theEmailFromAddress;
} }
/**
* If websocket subscriptions are enabled, this specifies the context path that listens to them. Default value "/websocket".
*/
public String getWebsocketContextPath() {
return myWebsocketContextPath;
}
/**
* If websocket subscriptions are enabled, this specifies the context path that listens to them. Default value "/websocket".
*/
public void setWebsocketContextPath(String theWebsocketContextPath) {
myWebsocketContextPath = theWebsocketContextPath;
}
private static void validateTreatBaseUrlsAsLocal(String theUrl) { private static void validateTreatBaseUrlsAsLocal(String theUrl) {
Validate.notBlank(theUrl, "Base URL must not be null or empty"); Validate.notBlank(theUrl, "Base URL must not be null or empty");
@ -374,5 +392,4 @@ public class ModelConfig {
} }
} }
} }

View File

@ -61,7 +61,7 @@ public class Retrier<T> {
@Override @Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
super.onError(context, callback, throwable); super.onError(context, callback, throwable);
ourLog.error("Retry failure " + context.getRetryCount() + "/" + theMaxRetries, throwable); ourLog.error("Retry failure {}/{}: {}", context.getRetryCount(), theMaxRetries, throwable.getMessage());
} }
}; };
myRetryTemplate.registerListener(listener); myRetryTemplate.registerListener(listener);

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -30,10 +30,11 @@ import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.SubscribableChannel;
import java.io.Closeable;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
public class ActiveSubscription { public class ActiveSubscription implements Closeable {
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class); private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
private CanonicalSubscription mySubscription; private CanonicalSubscription mySubscription;
@ -62,20 +63,6 @@ public class ActiveSubscription {
public void unregister(MessageHandler theMessageHandler) { public void unregister(MessageHandler theMessageHandler) {
if (mySubscribableChannel != null) { if (mySubscribableChannel != null) {
mySubscribableChannel.unsubscribe(theMessageHandler); mySubscribableChannel.unsubscribe(theMessageHandler);
if (mySubscribableChannel instanceof DisposableBean) {
try {
((DisposableBean) mySubscribableChannel).destroy();
} catch (Exception e) {
ourLog.error("Failed to destroy channel bean", e);
}
}
}
}
public void unregisterAll() {
for (MessageHandler messageHandler : myDeliveryHandlerSet) {
unregister(messageHandler);
} }
} }
@ -103,4 +90,33 @@ public class ActiveSubscription {
public void setFlagForDeletion(boolean theFlagForDeletion) { public void setFlagForDeletion(boolean theFlagForDeletion) {
flagForDeletion = theFlagForDeletion; flagForDeletion = theFlagForDeletion;
} }
@Override
public void close() {
for (MessageHandler messageHandler : myDeliveryHandlerSet) {
unregister(messageHandler);
}
if (mySubscribableChannel instanceof DisposableBean) {
try {
((DisposableBean) mySubscribableChannel).destroy();
} catch (Exception e) {
ourLog.error("Failed to destroy channel bean", e);
}
}
}
public void removeChannel() {
if (mySubscribableChannel instanceof IRemovableChannel) {
((IRemovableChannel)mySubscribableChannel).removeChannel();
}
}
/**
* Use close() instead
* KHS 15 Apr 2019
*/
@Deprecated
public void unregisterAll() {
close();
}
} }

View File

@ -51,7 +51,7 @@ class ActiveSubscriptionCache {
myCache.put(theSubscriptionId, theValue); myCache.put(theSubscriptionId, theValue);
} }
public void remove(String theSubscriptionId) { public synchronized void remove(String theSubscriptionId) {
Validate.notBlank(theSubscriptionId); Validate.notBlank(theSubscriptionId);
ActiveSubscription activeSubscription = myCache.get(theSubscriptionId); ActiveSubscription activeSubscription = myCache.get(theSubscriptionId);
@ -59,7 +59,8 @@ class ActiveSubscriptionCache {
return; return;
} }
activeSubscription.unregisterAll(); activeSubscription.close();
activeSubscription.removeChannel();
myCache.remove(theSubscriptionId); myCache.remove(theSubscriptionId);
} }

View File

@ -0,0 +1,5 @@
package ca.uhn.fhir.jpa.subscription.module.cache;
public interface IRemovableChannel {
void removeChannel();
}

View File

@ -151,6 +151,15 @@
The JPA server failed to index R4 reources with search parameters pointing to the Money data type. The JPA server failed to index R4 reources with search parameters pointing to the Money data type.
Thanks to GitHub user @navyflower for reporting! Thanks to GitHub user @navyflower for reporting!
</action> </action>
<action type="add">
Added new configuration parameter to DaoConfig and ModelConfig to specify the websocket context path.
(Before it was hardcoded to "/websocket").
</action>
<action type="add">
Added new IRemovableChannel interface. If a SubscriptionChannel implements this, then when a subscription
channel is destroyed (because its subscription is deleted) then the remove() method will be called on that
channel.
</action>
</release> </release>
<release version="3.7.0" date="2019-02-06" description="Gale"> <release version="3.7.0" date="2019-02-06" description="Gale">
<action type="add"> <action type="add">