Ongoing cleanup

This commit is contained in:
jamesagnew 2020-04-06 17:00:45 -04:00
parent 40d0c27ae3
commit b78205c218
24 changed files with 609 additions and 144 deletions

View File

@ -56,7 +56,7 @@ web address.
A common use for logical references is in references to conformance resources, such as ValueSets, StructureDefinitions, etc. For example, you might refer to the ValueSet `http://hl7.org/fhir/ValueSet/quantity-comparator` from your own resources. In this case, you are not necessarily telling the server that this is a real address that it should resolve, but rather that this is an identifier for a ValueSet where `ValueSet.url` has the given URI/URL.
HAPI can be configured to treat certain URI/URL patterns as logical by using the DaoConfig#setTreatReferencesAsLogical property (see [JavaDoc](/hapi-fhir/apidocs/hapi-fhir-jpaserver-base/ca/uhn/fhir/jpa/dao/DaoConfig.html#setTreatReferencesAsLogical(java.util.Set))).
HAPI can be configured to treat certain URI/URL patterns as logical by using the DaoConfig#setTreatReferencesAsLogical property (see [JavaDoc](/hapi-fhir/apidocs/hapi-fhir-jpaserver-api/ca/uhn/fhir/jpa/api/config/DaoConfig.html#setTreatReferencesAsLogical(java.util.Set))).
For example:

View File

@ -1,6 +1,6 @@
package ca.uhn.fhir.jpa.provider;
import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.dao.dstu2.BaseJpaDstu2Test;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.model.dstu2.resource.Bundle;

View File

@ -1,7 +1,7 @@
package ca.uhn.fhir.jpa.provider.dstu3;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.dao.dstu3.BaseJpaDstu3Test;
import ca.uhn.fhir.jpa.provider.GraphQLProvider;
import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider;

View File

@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.provider.r4;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
import ca.uhn.fhir.jpa.provider.GraphQLProvider;
import ca.uhn.fhir.jpa.provider.TerminologyUploaderProvider;

View File

@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.provider.r5;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.dao.r5.BaseJpaR5Test;
import ca.uhn.fhir.jpa.provider.GraphQLProvider;
import ca.uhn.fhir.jpa.provider.TerminologyUploaderProvider;

View File

@ -20,10 +20,6 @@ package ca.uhn.fhir.jpa.subscription.channel.queue;
* #L%
*/
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
/**
* This interface is the factory for Queue Channels, which are the low level abstraction over a
* queue (e.g. memory queue, JMS queue, Kafka stream, etc.) for any purpose.
@ -34,20 +30,38 @@ public interface IQueueChannelFactory {
* Create a channel that is used to receive messages from the queue.
*
* <p>
* Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, int)}
* Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, QueueChannelConsumerConfig)}
* when invoked with the same {@literal theChannelName} if they need to, or they can create a new instance.
* </p>
*
* @param theChannelName The actual underlying queue name
* @param theMessageType The object type that will be placed on this queue. Objects will be Jackson-annotated structures.
* @param theConfig Contains the configuration for subscribers. Note that this parameter is provided for
* both {@link #getOrCreateReceiver} and
* {@link #getOrCreateSender(String, Class, QueueChannelConsumerConfig)}
* even though this object is used to configure the sender only. We do this because the factory
* may want to create a single object to be used for both the sender and receiver, so this allows
* the config details to be known regardless of which method is returned first.
*/
SubscribableChannel getOrCreateReceiver(String theChannelName, Class<?> theMessageType, int theConcurrentConsumers);
IQueueChannelReceiver getOrCreateReceiver(String theChannelName, Class<?> theMessageType, QueueChannelConsumerConfig theConfig);
/**
* Create a channel that is used to send messages to the queue.
*
* <p>
* Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, int)}
* Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, QueueChannelConsumerConfig)}
* when invoked with the same {@literal theChannelName} if they need to, or they can create a new instance.
* </p>
*
* @param theChannelName The actual underlying queue name
* @param theMessageType The object type that will be placed on this queue. Objects will be Jackson-annotated structures.
* @param theConfig Contains the configuration for subscribers. Note that this parameter is provided for
* both {@link #getOrCreateReceiver} and
* {@link #getOrCreateSender(String, Class, QueueChannelConsumerConfig)}
* even though this object is used to configure the sender only. We do this because the factory
* may want to create a single object to be used for both the sender and receiver, so this allows
* the config details to be known regardless of which method is returned first.
*/
MessageChannel getOrCreateSender(String theChannelName, Class<?> theMessageType, int theConcurrentConsumers);
IQueueChannelSender getOrCreateSender(String theChannelName, Class<?> theMessageType, QueueChannelConsumerConfig theConfig);
}

View File

@ -0,0 +1,27 @@
package ca.uhn.fhir.jpa.subscription.channel.queue;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.InterceptableChannel;
public interface IQueueChannelReceiver extends SubscribableChannel, InterceptableChannel {
}

View File

@ -0,0 +1,28 @@
package ca.uhn.fhir.jpa.subscription.channel.queue;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.InterceptableChannel;
public interface IQueueChannelSender extends MessageChannel, InterceptableChannel {
}

View File

@ -20,84 +20,14 @@ package ca.uhn.fhir.jpa.subscription.channel.queue;
* #L%
*/
import ca.uhn.fhir.util.StopWatch;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.ThreadPoolExecutor;
public class LinkedBlockingQueueChannel implements SubscribableChannel {
private Logger ourLog = LoggerFactory.getLogger(LinkedBlockingQueueChannel.class);
public class LinkedBlockingQueueChannel extends ExecutorSubscribableChannel implements IQueueChannelSender, IQueueChannelReceiver {
private final ExecutorSubscribableChannel mySubscribableChannel;
private final BlockingQueue<Runnable> myQueue;
public LinkedBlockingQueueChannel(BlockingQueue<Runnable> theQueue, String theThreadNamingPattern, int theConcurrentConsumers) {
ThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern(theThreadNamingPattern)
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", theQueue.size());
StopWatch sw = new StopWatch();
try {
theQueue.put(theRunnable);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + e.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
theConcurrentConsumers,
0L,
TimeUnit.MILLISECONDS,
theQueue,
threadFactory,
rejectedExecutionHandler);
myQueue = theQueue;
mySubscribableChannel = new ExecutorSubscribableChannel(executor);
public LinkedBlockingQueueChannel(ThreadPoolExecutor theExecutor) {
super(theExecutor);
}
@Override
public boolean subscribe(MessageHandler handler) {
return mySubscribableChannel.subscribe(handler);
}
@Override
public boolean unsubscribe(MessageHandler handler) {
return mySubscribableChannel.unsubscribe(handler);
}
@Override
public boolean send(Message<?> message, long timeout) {
return mySubscribableChannel.send(message, timeout);
}
@VisibleForTesting
public void clearInterceptorsForUnitTest() {
mySubscribableChannel.setInterceptors(new ArrayList<>());
}
@VisibleForTesting
public void addInterceptorForUnitTest(ChannelInterceptor theInterceptor) {
mySubscribableChannel.addInterceptor(theInterceptor);
}
@VisibleForTesting
public int getQueueSizeForUnitTest() {
return myQueue.size();
}
}

View File

@ -21,17 +21,27 @@ package ca.uhn.fhir.jpa.subscription.channel.queue;
*/
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
import org.springframework.messaging.MessageChannel;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class LinkedBlockingQueueChannelFactory implements IQueueChannelFactory {
private Map<String, SubscribableChannel> myChannels = Collections.synchronizedMap(new HashMap<>());
private Map<String, LinkedBlockingQueueChannel> myChannels = Collections.synchronizedMap(new HashMap<>());
private static final Logger ourLog = LoggerFactory.getLogger(LinkedBlockingQueueChannelFactory.class);
/**
* Constructor
@ -41,20 +51,49 @@ public class LinkedBlockingQueueChannelFactory implements IQueueChannelFactory {
}
@Override
public SubscribableChannel getOrCreateReceiver(String theChannelName, Class<?> theMessageType, int theConcurrentConsumers) {
return getOrCreateChannel(theChannelName, theConcurrentConsumers);
public IQueueChannelReceiver getOrCreateReceiver(String theChannelName, Class<?> theMessageType, QueueChannelConsumerConfig theConfig) {
return getOrCreateChannel(theChannelName, theConfig.getConcurrentConsumers());
}
@Override
public MessageChannel getOrCreateSender(String theChannelName, Class<?> theMessageType, int theConcurrentConsumers) {
return getOrCreateChannel(theChannelName, theConcurrentConsumers);
public IQueueChannelSender getOrCreateSender(String theChannelName, Class<?> theMessageType, QueueChannelConsumerConfig theConfig) {
return getOrCreateChannel(theChannelName, theConfig.getConcurrentConsumers());
}
private SubscribableChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers) {
private LinkedBlockingQueueChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers) {
return myChannels.computeIfAbsent(theChannelName, t -> {
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE);
String threadNamingPattern = theChannelName + "-%d";
return new LinkedBlockingQueueChannel(queue, threadNamingPattern, theConcurrentConsumers);
ThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern(threadNamingPattern)
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", queue.size());
StopWatch sw = new StopWatch();
try {
queue.put(theRunnable);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + e.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
theConcurrentConsumers,
0L,
TimeUnit.MILLISECONDS,
queue,
threadFactory,
rejectedExecutionHandler);
return new LinkedBlockingQueueChannel(executor);
});
}

View File

@ -0,0 +1,35 @@
package ca.uhn.fhir.jpa.subscription.channel.queue;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
public class QueueChannelConsumerConfig {
private int myConcurrentConsumers;
public int getConcurrentConsumers() {
return myConcurrentConsumers;
}
public void setConcurrentConsumers(int theConcurrentConsumers) {
myConcurrentConsumers = theConcurrentConsumers;
}
}

View File

@ -21,23 +21,22 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
*/
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelSender;
import ca.uhn.fhir.jpa.subscription.channel.queue.QueueChannelConsumerConfig;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.AbstractSubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
public class SubscriptionChannelFactory {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionChannelFactory.class);
private final IQueueChannelFactory myQueueChannelFactory;
/**
@ -48,24 +47,40 @@ public class SubscriptionChannelFactory {
myQueueChannelFactory = theQueueChannelFactory;
}
public MessageChannel newDeliverySendingChannel(String theChannelName) {
return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceDeliveryJsonMessage.class, getDeliveryChannelConcurrentConsumers());
public IQueueChannelSender newDeliverySendingChannel(String theChannelName) {
QueueChannelConsumerConfig config = newConfigForDeliveryChannel();
return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceDeliveryJsonMessage.class, config);
}
public SubscribableChannel newDeliveryChannel(String theChannelName) {
SubscribableChannel channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryJsonMessage.class, getDeliveryChannelConcurrentConsumers());
public IQueueChannelReceiver newDeliveryReceivingChannel(String theChannelName) {
QueueChannelConsumerConfig config = newConfigForDeliveryChannel();
IQueueChannelReceiver channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryJsonMessage.class, config);
return new BroadcastingSubscribableChannelWrapper(channel);
}
public MessageChannel newMatchingSendingChannel(String theChannelName) {
return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedJsonMessage.class, getMatchingChannelConcurrentConsumers());
public IQueueChannelSender newMatchingSendingChannel(String theChannelName) {
QueueChannelConsumerConfig config = newConfigForMatchingChannel();
return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedJsonMessage.class, config);
}
public SubscribableChannel newMatchingReceivingChannel(String theChannelName) {
SubscribableChannel channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedJsonMessage.class, getMatchingChannelConcurrentConsumers());
public IQueueChannelReceiver newMatchingReceivingChannel(String theChannelName) {
QueueChannelConsumerConfig config = newConfigForMatchingChannel();
IQueueChannelReceiver channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedJsonMessage.class, config);
return new BroadcastingSubscribableChannelWrapper(channel);
}
protected QueueChannelConsumerConfig newConfigForDeliveryChannel() {
QueueChannelConsumerConfig config = new QueueChannelConsumerConfig();
config.setConcurrentConsumers(getDeliveryChannelConcurrentConsumers());
return config;
}
protected QueueChannelConsumerConfig newConfigForMatchingChannel() {
QueueChannelConsumerConfig config = new QueueChannelConsumerConfig();
config.setConcurrentConsumers(getMatchingChannelConcurrentConsumers());
return config;
}
public int getDeliveryChannelConcurrentConsumers() {
return SubscriptionConstants.DELIVERY_CHANNEL_CONCURRENT_CONSUMERS;
}
@ -74,12 +89,12 @@ public class SubscriptionChannelFactory {
return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS;
}
public static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements MessageHandler, DisposableBean {
public static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements IQueueChannelReceiver, DisposableBean {
private final SubscribableChannel myWrappedChannel;
private final IQueueChannelReceiver myWrappedChannel;
public BroadcastingSubscribableChannelWrapper(SubscribableChannel theChannel) {
theChannel.subscribe(this);
public BroadcastingSubscribableChannelWrapper(IQueueChannelReceiver theChannel) {
theChannel.subscribe(message -> send(message));
myWrappedChannel = theChannel;
}
@ -89,20 +104,10 @@ public class SubscriptionChannelFactory {
@Override
protected boolean sendInternal(Message<?> theMessage, long timeout) {
// try {
for (MessageHandler next : getSubscribers()) {
next.handleMessage(theMessage);
}
return true;
// } catch (Exception e) {
// ourLog.error("Failiure handling message", e);
// return false;
// }
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
send(message);
for (MessageHandler next : getSubscribers()) {
next.handleMessage(theMessage);
}
return true;
}
@Override
@ -112,5 +117,12 @@ public class SubscriptionChannelFactory {
}
}
@Override
public void addInterceptor(ChannelInterceptor interceptor) {
super.addInterceptor(interceptor);
myWrappedChannel.addInterceptor(interceptor);
}
}
}

View File

@ -22,7 +22,6 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import org.slf4j.Logger;
@ -32,7 +31,6 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@ -61,7 +59,7 @@ public class SubscriptionChannelRegistry {
return;
}
SubscribableChannel deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName);
SubscribableChannel deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryReceivingChannel(channelName);
Optional<MessageHandler> deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType());
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel);

View File

@ -97,11 +97,6 @@ public class SubscriptionProcessorConfig {
return new DaoResourceRetriever();
}
@Bean
public WebsocketConnectionValidator websocketConnectionValidator() {
return new WebsocketConnectionValidator();
}
@Bean
public SubscriptionLoader subscriptionLoader() {
return new SubscriptionLoader();

View File

@ -1,8 +1,8 @@
package ca.uhn.fhir.jpa.config;
package ca.uhn.fhir.jpa.subscription.process.config;
/*
* #%L
* HAPI FHIR JPA Server
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.subscription.process.deliver.websocket.SubscriptionWebsocketHandler;
import ca.uhn.fhir.jpa.subscription.process.deliver.websocket.WebsocketConnectionValidator;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@ -37,9 +38,15 @@ import org.springframework.web.socket.handler.PerConnectionWebSocketHandler;
@EnableWebSocket()
@Controller
public class WebsocketDispatcherConfig implements WebSocketConfigurer {
@Autowired
ModelConfig myModelConfig;
@Bean
public WebsocketConnectionValidator websocketConnectionValidator() {
return new WebsocketConnectionValidator();
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry theRegistry) {
theRegistry.addHandler(subscriptionWebSocketHandler(), myModelConfig.getWebsocketContextPath()).setAllowedOrigins("*");

View File

@ -50,6 +50,13 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
@Autowired
SubscriptionChannelRegistry mySubscriptionChannelRegistry;
/**
* Constructor
*/
public SubscriptionWebsocketHandler() {
super();
}
@Autowired
private FhirContext myCtx;

View File

@ -36,6 +36,13 @@ public class WebsocketConnectionValidator {
SubscriptionRegistry mySubscriptionRegistry;
/**
* Constructor
*/
public WebsocketConnectionValidator() {
super();
}
public WebsocketValidationResponse validate(@NotNull IdType id) {
if (!id.hasIdPart() || !id.isIdPartValid()) {
return WebsocketValidationResponse.INVALID_RESPONSE("Invalid bind request - No ID included: " + id.getValue());

View File

@ -36,9 +36,19 @@ public abstract class BaseSubscriberForSubscriptionResources implements MessageH
protected FhirContext myFhirContext;
protected boolean isSubscription(ResourceModifiedMessage theNewResource) {
IBaseResource payload = theNewResource.getNewPayload(myFhirContext);
String payloadIdType = myFhirContext.getResourceDefinition(payload).getName();
return payloadIdType.equals(ResourceTypeEnum.SUBSCRIPTION.getCode());
String payloadIdType = null;
IIdType payloadId = theNewResource.getId(myFhirContext);
if (payloadId != null) {
payloadIdType = payloadId.getResourceType();
}
if (isBlank(payloadIdType)) {
IBaseResource payload = theNewResource.getNewPayload(myFhirContext);
if (payload != null) {
payloadIdType = myFhirContext.getResourceDefinition(payload).getName();
}
}
return ResourceTypeEnum.SUBSCRIPTION.getCode().equals(payloadIdType);
}
}

View File

@ -0,0 +1,76 @@
package ca.uhn.fhir.jpa.subscription.channel.subscription;
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannelFactory;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.GenericMessage;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class SubscriptionChannelFactoryTest {
private SubscriptionChannelFactory mySvc;
@Mock
private ChannelInterceptor myInterceptor;
@Captor
private ArgumentCaptor<Exception> myExceptionCaptor;
@Before
public void before() {
mySvc = new SubscriptionChannelFactory(new LinkedBlockingQueueChannelFactory());
}
/**
* Make sure the channel doesn't silently swallow exceptions
*/
@Test
public void testInterceptorsOnChannelWrapperArePropagated() {
IQueueChannelReceiver channel = mySvc.newDeliveryReceivingChannel("CHANNEL_NAME");
channel.subscribe(new NpeThrowingHandler());
channel.addInterceptor(myInterceptor);
Message<?> input = new GenericMessage<>("TEST");
when(myInterceptor.preSend(any(),any())).thenAnswer(t->t.getArgument(0, Message.class));
when(myInterceptor.postReceive(any(),any())).thenAnswer(t->t.getArgument(0, Message.class));
try {
channel.send(input);
fail();
} catch (MessageDeliveryException e) {
assertTrue(e.getCause() instanceof NullPointerException);
}
verify(myInterceptor, times(1)).afterSendCompletion(any(), any(), anyBoolean(), myExceptionCaptor.capture());
assertTrue(myExceptionCaptor.getValue() instanceof NullPointerException);
}
private class NpeThrowingHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
throw new NullPointerException("THIS IS THE MESSAGE");
}
}
}

View File

@ -6,7 +6,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider;
import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.interceptor.CascadingDeleteInterceptor;
import ca.uhn.fhir.jpa.provider.GraphQLProvider;
import ca.uhn.fhir.jpa.provider.JpaConformanceProviderDstu2;
@ -20,7 +20,6 @@ import ca.uhn.fhir.jpa.provider.r5.JpaConformanceProviderR5;
import ca.uhn.fhir.jpa.provider.r5.JpaSystemProviderR5;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.util.ResourceProviderFactory;
import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator;
import ca.uhn.fhir.rest.api.EncodingEnum;

View File

@ -1,6 +1,6 @@
package ca.uhn.fhirtest.config;
import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.channel.config.SubscriptionChannelConfig;
import ca.uhn.fhir.jpa.subscription.process.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;

View File

@ -0,0 +1,278 @@
package ca.uhn.fhir.rest.server.interceptor.auth;
/*-
* #%L
* HAPI FHIR - Server Framework
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.rest.api.QualifiedParamList;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.param.ParameterUtil;
import ca.uhn.fhir.rest.server.exceptions.AuthenticationException;
import ca.uhn.fhir.rest.server.method.BaseMethodBinding;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.rest.server.servlet.ServletSubRequestDetails;
import ca.uhn.fhir.rest.server.util.ServletRequestUtil;
import ca.uhn.fhir.util.BundleUtil;
import ca.uhn.fhir.util.bundle.ModifiableBundleEntry;
import com.google.common.collect.ArrayListMultimap;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.*;
import java.util.function.Consumer;
/**
* This interceptor can be used to automatically narrow the scope of searches in order to
* automatically restrict the searches to specific compartments.
* <p>
* For example, this interceptor
* could be used to restrict a user to only viewing data belonging to Patient/123 (i.e. data
* in the <code>Patient/123</code> compartment). In this case, a user performing a search
* for<br/>
* <code>http://baseurl/Observation?category=laboratory</code><br/>
* would receive results as though they had requested<br/>
* <code>http://baseurl/Observation?subject=Patient/123&category=laboratory</code>
* </p>
* <p>
* Note that this interceptor should be used in combination with {@link AuthorizationInterceptor}
* if you are restricting results because of a security restriction. This interceptor is not
* intended to be a failsafe way of preventing users from seeing the wrong data (that is the
* purpose of AuthorizationInterceptor). This interceptor is simply intended as a convenience to
* help users simplify their queries while not receiving security errors for to trying to access
* data they do not have access to see.
* </p>
*
* @see AuthorizationInterceptor
*/
public class SearchNarrowingInterceptor {
private static final Logger ourLog = LoggerFactory.getLogger(SearchNarrowingInterceptor.class);
/**
* Subclasses should override this method to supply the set of compartments that
* the user making the request should actually have access to.
* <p>
* Typically this is done by examining <code>theRequestDetails</code> to find
* out who the current user is and then building a list of Strings.
* </p>
*
* @param theRequestDetails The individual request currently being applied
* @return The list of allowed compartments and instances that should be used
* for search narrowing. If this method returns <code>null</code>, no narrowing will
* be performed
*/
protected AuthorizedList buildAuthorizedList(@SuppressWarnings("unused") RequestDetails theRequestDetails) {
return null;
}
@Hook(Pointcut.SERVER_INCOMING_REQUEST_POST_PROCESSED)
public boolean incomingRequestPostProcessed(RequestDetails theRequestDetails, HttpServletRequest theRequest, HttpServletResponse theResponse) throws AuthenticationException {
// We don't support this operation type yet
Validate.isTrue(theRequestDetails.getRestOperationType() != RestOperationTypeEnum.SEARCH_SYSTEM);
if (theRequestDetails.getRestOperationType() != RestOperationTypeEnum.SEARCH_TYPE) {
return true;
}
FhirContext ctx = theRequestDetails.getServer().getFhirContext();
RuntimeResourceDefinition resDef = ctx.getResourceDefinition(theRequestDetails.getResourceName());
HashMap<String, List<String>> parameterToOrValues = new HashMap<>();
AuthorizedList authorizedList = buildAuthorizedList(theRequestDetails);
if (authorizedList == null) {
return true;
}
/*
* Create a map of search parameter values that need to be added to the
* given request
*/
Collection<String> compartments = authorizedList.getAllowedCompartments();
if (compartments != null) {
processResourcesOrCompartments(theRequestDetails, resDef, parameterToOrValues, compartments, true);
}
Collection<String> resources = authorizedList.getAllowedInstances();
if (resources != null) {
processResourcesOrCompartments(theRequestDetails, resDef, parameterToOrValues, resources, false);
}
/*
* Add any param values to the actual request
*/
if (parameterToOrValues.size() > 0) {
Map<String, String[]> newParameters = new HashMap<>(theRequestDetails.getParameters());
for (Map.Entry<String, List<String>> nextEntry : parameterToOrValues.entrySet()) {
String nextParamName = nextEntry.getKey();
List<String> nextAllowedValues = nextEntry.getValue();
if (!newParameters.containsKey(nextParamName)) {
/*
* If we don't already have a parameter of the given type, add one
*/
String nextValuesJoined = ParameterUtil.escapeAndJoinOrList(nextAllowedValues);
String[] paramValues = {nextValuesJoined};
newParameters.put(nextParamName, paramValues);
} else {
/*
* If the client explicitly requested the given parameter already, we'll
* just update the request to have the intersection of the values that the client
* requested, and the values that the user is allowed to see
*/
String[] existingValues = newParameters.get(nextParamName);
boolean restrictedExistingList = false;
for (int i = 0; i < existingValues.length; i++) {
String nextExistingValue = existingValues[i];
List<String> nextRequestedValues = QualifiedParamList.splitQueryStringByCommasIgnoreEscape(null, nextExistingValue);
List<String> nextPermittedValues = ListUtils.intersection(nextRequestedValues, nextAllowedValues);
if (nextPermittedValues.size() > 0) {
restrictedExistingList = true;
existingValues[i] = ParameterUtil.escapeAndJoinOrList(nextPermittedValues);
}
}
/*
* If none of the values that were requested by the client overlap at all
* with the values that the user is allowed to see, we'll just add the permitted
* list as a new list. Ultimately this scenario actually means that the client
* shouldn't get *any* results back, and adding a new AND parameter (that doesn't
* overlap at all with the others) is one way of ensuring that.
*/
if (!restrictedExistingList) {
String[] newValues = Arrays.copyOf(existingValues, existingValues.length + 1);
newValues[existingValues.length] = ParameterUtil.escapeAndJoinOrList(nextAllowedValues);
newParameters.put(nextParamName, newValues);
}
}
}
theRequestDetails.setParameters(newParameters);
}
return true;
}
@Hook(Pointcut.SERVER_INCOMING_REQUEST_PRE_HANDLED)
public void incomingRequestPreHandled(ServletRequestDetails theRequestDetails, HttpServletRequest theRequest, HttpServletResponse theResponse) throws AuthenticationException {
if (theRequestDetails.getRestOperationType() != RestOperationTypeEnum.TRANSACTION) {
return;
}
IBaseBundle bundle = (IBaseBundle) theRequestDetails.getResource();
FhirContext ctx = theRequestDetails.getFhirContext();
BundleEntryUrlProcessor processor = new BundleEntryUrlProcessor(ctx, theRequestDetails, theRequest, theResponse);
BundleUtil.processEntries(ctx, bundle, processor);
}
private class BundleEntryUrlProcessor implements Consumer<ModifiableBundleEntry> {
private final FhirContext myFhirContext;
private final ServletRequestDetails myRequestDetails;
private final HttpServletRequest myRequest;
private final HttpServletResponse myResponse;
public BundleEntryUrlProcessor(FhirContext theFhirContext, ServletRequestDetails theRequestDetails, HttpServletRequest theRequest, HttpServletResponse theResponse) {
myFhirContext = theFhirContext;
myRequestDetails = theRequestDetails;
myRequest = theRequest;
myResponse = theResponse;
}
@Override
public void accept(ModifiableBundleEntry theModifiableBundleEntry) {
ArrayListMultimap<String, String> paramValues = ArrayListMultimap.create();
String url = theModifiableBundleEntry.getRequestUrl();
ServletSubRequestDetails subServletRequestDetails = ServletRequestUtil.getServletSubRequestDetails(myRequestDetails, url, paramValues);
BaseMethodBinding<?> method = subServletRequestDetails.getServer().determineResourceMethod(subServletRequestDetails, url);
RestOperationTypeEnum restOperationType = method.getRestOperationType();
subServletRequestDetails.setRestOperationType(restOperationType);
incomingRequestPostProcessed(subServletRequestDetails, myRequest, myResponse);
theModifiableBundleEntry.setRequestUrl(myFhirContext, ServletRequestUtil.extractUrl(subServletRequestDetails));
}
}
private void processResourcesOrCompartments(RequestDetails theRequestDetails, RuntimeResourceDefinition theResDef, HashMap<String, List<String>> theParameterToOrValues, Collection<String> theResourcesOrCompartments, boolean theAreCompartments) {
String lastCompartmentName = null;
String lastSearchParamName = null;
for (String nextCompartment : theResourcesOrCompartments) {
Validate.isTrue(StringUtils.countMatches(nextCompartment, '/') == 1, "Invalid compartment name (must be in form \"ResourceType/xxx\": %s", nextCompartment);
String compartmentName = nextCompartment.substring(0, nextCompartment.indexOf('/'));
String searchParamName = null;
if (compartmentName.equalsIgnoreCase(lastCompartmentName)) {
// Avoid doing a lookup for the same thing repeatedly
searchParamName = lastSearchParamName;
} else {
if (compartmentName.equalsIgnoreCase(theRequestDetails.getResourceName())) {
searchParamName = "_id";
} else if (theAreCompartments) {
List<RuntimeSearchParam> searchParams = theResDef.getSearchParamsForCompartmentName(compartmentName);
if (searchParams.size() > 0) {
// Resources like Observation have several fields that add the resource to
// the compartment. In the case of Observation, it's subject, patient and performer.
// For this kind of thing, we'll prefer the one called "patient".
RuntimeSearchParam searchParam =
searchParams
.stream()
.filter(t -> t.getName().equalsIgnoreCase(compartmentName))
.findFirst()
.orElse(searchParams.get(0));
searchParamName = searchParam.getName();
}
}
lastCompartmentName = compartmentName;
lastSearchParamName = searchParamName;
}
if (searchParamName != null) {
List<String> orValues = theParameterToOrValues.computeIfAbsent(searchParamName, t -> new ArrayList<>());
orValues.add(nextCompartment);
}
}
}
}

View File

@ -27,7 +27,6 @@ import ca.uhn.fhir.jpa.config.BaseJavaConfigDstu2;
import ca.uhn.fhir.jpa.config.BaseJavaConfigDstu3;
import ca.uhn.fhir.jpa.config.BaseJavaConfigR4;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.provider.BaseJpaProvider;
import ca.uhn.fhir.jpa.provider.BaseJpaSystemProvider;

View File

@ -98,6 +98,10 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
setExpectedCount(theCount, false);
}
public boolean isSet() {
return myCountdownLatch.get() != null;
}
private void createLatch(int theCount) {
myFailures.set(Collections.synchronizedList(new ArrayList<>()));
myCalledWith.set(Collections.synchronizedList(new ArrayList<>()));