diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa/configuration.md b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa/configuration.md
index 25125d85b08..0b33e7c4f52 100644
--- a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa/configuration.md
+++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa/configuration.md
@@ -56,7 +56,7 @@ web address.
A common use for logical references is in references to conformance resources, such as ValueSets, StructureDefinitions, etc. For example, you might refer to the ValueSet `http://hl7.org/fhir/ValueSet/quantity-comparator` from your own resources. In this case, you are not necessarily telling the server that this is a real address that it should resolve, but rather that this is an identifier for a ValueSet where `ValueSet.url` has the given URI/URL.
-HAPI can be configured to treat certain URI/URL patterns as logical by using the DaoConfig#setTreatReferencesAsLogical property (see [JavaDoc](/hapi-fhir/apidocs/hapi-fhir-jpaserver-base/ca/uhn/fhir/jpa/dao/DaoConfig.html#setTreatReferencesAsLogical(java.util.Set))).
+HAPI can be configured to treat certain URI/URL patterns as logical by using the DaoConfig#setTreatReferencesAsLogical property (see [JavaDoc](/hapi-fhir/apidocs/hapi-fhir-jpaserver-api/ca/uhn/fhir/jpa/api/config/DaoConfig.html#setTreatReferencesAsLogical(java.util.Set))).
For example:
diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/BaseResourceProviderDstu2Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/BaseResourceProviderDstu2Test.java
index 93ce55c988d..539d27c2c1b 100644
--- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/BaseResourceProviderDstu2Test.java
+++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/BaseResourceProviderDstu2Test.java
@@ -1,6 +1,6 @@
package ca.uhn.fhir.jpa.provider;
-import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
+import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.dao.dstu2.BaseJpaDstu2Test;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.model.dstu2.resource.Bundle;
diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java
index 445a7766ffb..cce16453f54 100644
--- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java
+++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java
@@ -1,7 +1,7 @@
package ca.uhn.fhir.jpa.provider.dstu3;
import ca.uhn.fhir.context.support.IValidationSupport;
-import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
+import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.dao.dstu3.BaseJpaDstu3Test;
import ca.uhn.fhir.jpa.provider.GraphQLProvider;
import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider;
diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java
index dc45df8c348..23cd7b33f9d 100644
--- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java
+++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java
@@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.provider.r4;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
-import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
+import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
import ca.uhn.fhir.jpa.provider.GraphQLProvider;
import ca.uhn.fhir.jpa.provider.TerminologyUploaderProvider;
diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r5/BaseResourceProviderR5Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r5/BaseResourceProviderR5Test.java
index fb3c410147f..578da58bc53 100644
--- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r5/BaseResourceProviderR5Test.java
+++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r5/BaseResourceProviderR5Test.java
@@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.provider.r5;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
-import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
+import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.dao.r5.BaseJpaR5Test;
import ca.uhn.fhir.jpa.provider.GraphQLProvider;
import ca.uhn.fhir.jpa.provider.TerminologyUploaderProvider;
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelFactory.java
index 92321b7f44b..9ec88c0e3d6 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelFactory.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelFactory.java
@@ -20,10 +20,6 @@ package ca.uhn.fhir.jpa.subscription.channel.queue;
* #L%
*/
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.SubscribableChannel;
-
-
/**
* This interface is the factory for Queue Channels, which are the low level abstraction over a
* queue (e.g. memory queue, JMS queue, Kafka stream, etc.) for any purpose.
@@ -34,20 +30,38 @@ public interface IQueueChannelFactory {
* Create a channel that is used to receive messages from the queue.
*
*
- * Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, int)}
+ * Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, QueueChannelConsumerConfig)}
* when invoked with the same {@literal theChannelName} if they need to, or they can create a new instance.
*
+ *
+ * @param theChannelName The actual underlying queue name
+ * @param theMessageType The object type that will be placed on this queue. Objects will be Jackson-annotated structures.
+ * @param theConfig Contains the configuration for subscribers. Note that this parameter is provided for
+ * both {@link #getOrCreateReceiver} and
+ * {@link #getOrCreateSender(String, Class, QueueChannelConsumerConfig)}
+ * even though this object is used to configure the sender only. We do this because the factory
+ * may want to create a single object to be used for both the sender and receiver, so this allows
+ * the config details to be known regardless of which method is returned first.
*/
- SubscribableChannel getOrCreateReceiver(String theChannelName, Class> theMessageType, int theConcurrentConsumers);
+ IQueueChannelReceiver getOrCreateReceiver(String theChannelName, Class> theMessageType, QueueChannelConsumerConfig theConfig);
/**
* Create a channel that is used to send messages to the queue.
*
*
- * Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, int)}
+ * Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, QueueChannelConsumerConfig)}
* when invoked with the same {@literal theChannelName} if they need to, or they can create a new instance.
*
+ *
+ * @param theChannelName The actual underlying queue name
+ * @param theMessageType The object type that will be placed on this queue. Objects will be Jackson-annotated structures.
+ * @param theConfig Contains the configuration for subscribers. Note that this parameter is provided for
+ * both {@link #getOrCreateReceiver} and
+ * {@link #getOrCreateSender(String, Class, QueueChannelConsumerConfig)}
+ * even though this object is used to configure the sender only. We do this because the factory
+ * may want to create a single object to be used for both the sender and receiver, so this allows
+ * the config details to be known regardless of which method is returned first.
*/
- MessageChannel getOrCreateSender(String theChannelName, Class> theMessageType, int theConcurrentConsumers);
+ IQueueChannelSender getOrCreateSender(String theChannelName, Class> theMessageType, QueueChannelConsumerConfig theConfig);
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelReceiver.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelReceiver.java
new file mode 100644
index 00000000000..eca925b288a
--- /dev/null
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelReceiver.java
@@ -0,0 +1,27 @@
+package ca.uhn.fhir.jpa.subscription.channel.queue;
+
+/*-
+ * #%L
+ * HAPI FHIR Subscription Server
+ * %%
+ * Copyright (C) 2014 - 2020 University Health Network
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.springframework.messaging.SubscribableChannel;
+import org.springframework.messaging.support.InterceptableChannel;
+
+public interface IQueueChannelReceiver extends SubscribableChannel, InterceptableChannel {
+}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelSender.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelSender.java
new file mode 100644
index 00000000000..f231347072b
--- /dev/null
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelSender.java
@@ -0,0 +1,28 @@
+package ca.uhn.fhir.jpa.subscription.channel.queue;
+
+/*-
+ * #%L
+ * HAPI FHIR Subscription Server
+ * %%
+ * Copyright (C) 2014 - 2020 University Health Network
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.SubscribableChannel;
+import org.springframework.messaging.support.InterceptableChannel;
+
+public interface IQueueChannelSender extends MessageChannel, InterceptableChannel {
+}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannel.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannel.java
index 4a8b091fba5..1695ac6a410 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannel.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannel.java
@@ -20,84 +20,14 @@ package ca.uhn.fhir.jpa.subscription.channel.queue;
* #L%
*/
-import ca.uhn.fhir.util.StopWatch;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageHandler;
-import org.springframework.messaging.SubscribableChannel;
-import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
-import java.util.ArrayList;
-import java.util.concurrent.*;
+import java.util.concurrent.ThreadPoolExecutor;
-public class LinkedBlockingQueueChannel implements SubscribableChannel {
- private Logger ourLog = LoggerFactory.getLogger(LinkedBlockingQueueChannel.class);
+public class LinkedBlockingQueueChannel extends ExecutorSubscribableChannel implements IQueueChannelSender, IQueueChannelReceiver {
- private final ExecutorSubscribableChannel mySubscribableChannel;
- private final BlockingQueue myQueue;
-
- public LinkedBlockingQueueChannel(BlockingQueue theQueue, String theThreadNamingPattern, int theConcurrentConsumers) {
-
- ThreadFactory threadFactory = new BasicThreadFactory.Builder()
- .namingPattern(theThreadNamingPattern)
- .daemon(false)
- .priority(Thread.NORM_PRIORITY)
- .build();
- RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
- ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", theQueue.size());
- StopWatch sw = new StopWatch();
- try {
- theQueue.put(theRunnable);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RejectedExecutionException("Task " + theRunnable.toString() +
- " rejected from " + e.toString());
- }
- ourLog.info("Slot become available after {}ms", sw.getMillis());
- };
- ThreadPoolExecutor executor = new ThreadPoolExecutor(
- 1,
- theConcurrentConsumers,
- 0L,
- TimeUnit.MILLISECONDS,
- theQueue,
- threadFactory,
- rejectedExecutionHandler);
- myQueue = theQueue;
- mySubscribableChannel = new ExecutorSubscribableChannel(executor);
+ public LinkedBlockingQueueChannel(ThreadPoolExecutor theExecutor) {
+ super(theExecutor);
}
- @Override
- public boolean subscribe(MessageHandler handler) {
- return mySubscribableChannel.subscribe(handler);
- }
-
- @Override
- public boolean unsubscribe(MessageHandler handler) {
- return mySubscribableChannel.unsubscribe(handler);
- }
-
- @Override
- public boolean send(Message> message, long timeout) {
- return mySubscribableChannel.send(message, timeout);
- }
-
- @VisibleForTesting
- public void clearInterceptorsForUnitTest() {
- mySubscribableChannel.setInterceptors(new ArrayList<>());
- }
-
- @VisibleForTesting
- public void addInterceptorForUnitTest(ChannelInterceptor theInterceptor) {
- mySubscribableChannel.addInterceptor(theInterceptor);
- }
-
- @VisibleForTesting
- public int getQueueSizeForUnitTest() {
- return myQueue.size();
- }
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannelFactory.java
index 07c2f0f710a..2f069fe6c9d 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannelFactory.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannelFactory.java
@@ -21,17 +21,27 @@ package ca.uhn.fhir.jpa.subscription.channel.queue;
*/
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
-import org.springframework.messaging.MessageChannel;
+import ca.uhn.fhir.util.StopWatch;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.messaging.SubscribableChannel;
+import org.springframework.messaging.support.ExecutorSubscribableChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public class LinkedBlockingQueueChannelFactory implements IQueueChannelFactory {
- private Map myChannels = Collections.synchronizedMap(new HashMap<>());
+ private Map myChannels = Collections.synchronizedMap(new HashMap<>());
+ private static final Logger ourLog = LoggerFactory.getLogger(LinkedBlockingQueueChannelFactory.class);
/**
* Constructor
@@ -41,20 +51,49 @@ public class LinkedBlockingQueueChannelFactory implements IQueueChannelFactory {
}
@Override
- public SubscribableChannel getOrCreateReceiver(String theChannelName, Class> theMessageType, int theConcurrentConsumers) {
- return getOrCreateChannel(theChannelName, theConcurrentConsumers);
+ public IQueueChannelReceiver getOrCreateReceiver(String theChannelName, Class> theMessageType, QueueChannelConsumerConfig theConfig) {
+ return getOrCreateChannel(theChannelName, theConfig.getConcurrentConsumers());
}
@Override
- public MessageChannel getOrCreateSender(String theChannelName, Class> theMessageType, int theConcurrentConsumers) {
- return getOrCreateChannel(theChannelName, theConcurrentConsumers);
+ public IQueueChannelSender getOrCreateSender(String theChannelName, Class> theMessageType, QueueChannelConsumerConfig theConfig) {
+ return getOrCreateChannel(theChannelName, theConfig.getConcurrentConsumers());
}
- private SubscribableChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers) {
+ private LinkedBlockingQueueChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers) {
return myChannels.computeIfAbsent(theChannelName, t -> {
- LinkedBlockingQueue queue = new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE);
+
String threadNamingPattern = theChannelName + "-%d";
- return new LinkedBlockingQueueChannel(queue, threadNamingPattern, theConcurrentConsumers);
+
+ ThreadFactory threadFactory = new BasicThreadFactory.Builder()
+ .namingPattern(threadNamingPattern)
+ .daemon(false)
+ .priority(Thread.NORM_PRIORITY)
+ .build();
+
+ LinkedBlockingQueue queue = new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE);
+ RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
+ ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", queue.size());
+ StopWatch sw = new StopWatch();
+ try {
+ queue.put(theRunnable);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RejectedExecutionException("Task " + theRunnable.toString() +
+ " rejected from " + e.toString());
+ }
+ ourLog.info("Slot become available after {}ms", sw.getMillis());
+ };
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ 1,
+ theConcurrentConsumers,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ queue,
+ threadFactory,
+ rejectedExecutionHandler);
+ return new LinkedBlockingQueueChannel(executor);
+
});
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/QueueChannelConsumerConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/QueueChannelConsumerConfig.java
new file mode 100644
index 00000000000..ce465b0ade7
--- /dev/null
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/QueueChannelConsumerConfig.java
@@ -0,0 +1,35 @@
+package ca.uhn.fhir.jpa.subscription.channel.queue;
+
+/*-
+ * #%L
+ * HAPI FHIR Subscription Server
+ * %%
+ * Copyright (C) 2014 - 2020 University Health Network
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+public class QueueChannelConsumerConfig {
+
+ private int myConcurrentConsumers;
+
+ public int getConcurrentConsumers() {
+ return myConcurrentConsumers;
+ }
+
+ public void setConcurrentConsumers(int theConcurrentConsumers) {
+ myConcurrentConsumers = theConcurrentConsumers;
+ }
+
+}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java
index 3603fbd8f23..c25aab6bfa0 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java
@@ -21,23 +21,22 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
*/
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory;
+import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelReceiver;
+import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelSender;
+import ca.uhn.fhir.jpa.subscription.channel.queue.QueueChannelConsumerConfig;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
import org.apache.commons.lang3.Validate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
-import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.AbstractSubscribableChannel;
+import org.springframework.messaging.support.ChannelInterceptor;
public class SubscriptionChannelFactory {
- private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionChannelFactory.class);
private final IQueueChannelFactory myQueueChannelFactory;
/**
@@ -48,24 +47,40 @@ public class SubscriptionChannelFactory {
myQueueChannelFactory = theQueueChannelFactory;
}
- public MessageChannel newDeliverySendingChannel(String theChannelName) {
- return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceDeliveryJsonMessage.class, getDeliveryChannelConcurrentConsumers());
+ public IQueueChannelSender newDeliverySendingChannel(String theChannelName) {
+ QueueChannelConsumerConfig config = newConfigForDeliveryChannel();
+ return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceDeliveryJsonMessage.class, config);
}
- public SubscribableChannel newDeliveryChannel(String theChannelName) {
- SubscribableChannel channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryJsonMessage.class, getDeliveryChannelConcurrentConsumers());
+ public IQueueChannelReceiver newDeliveryReceivingChannel(String theChannelName) {
+ QueueChannelConsumerConfig config = newConfigForDeliveryChannel();
+ IQueueChannelReceiver channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryJsonMessage.class, config);
return new BroadcastingSubscribableChannelWrapper(channel);
}
- public MessageChannel newMatchingSendingChannel(String theChannelName) {
- return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedJsonMessage.class, getMatchingChannelConcurrentConsumers());
+ public IQueueChannelSender newMatchingSendingChannel(String theChannelName) {
+ QueueChannelConsumerConfig config = newConfigForMatchingChannel();
+ return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedJsonMessage.class, config);
}
- public SubscribableChannel newMatchingReceivingChannel(String theChannelName) {
- SubscribableChannel channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedJsonMessage.class, getMatchingChannelConcurrentConsumers());
+ public IQueueChannelReceiver newMatchingReceivingChannel(String theChannelName) {
+ QueueChannelConsumerConfig config = newConfigForMatchingChannel();
+ IQueueChannelReceiver channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedJsonMessage.class, config);
return new BroadcastingSubscribableChannelWrapper(channel);
}
+ protected QueueChannelConsumerConfig newConfigForDeliveryChannel() {
+ QueueChannelConsumerConfig config = new QueueChannelConsumerConfig();
+ config.setConcurrentConsumers(getDeliveryChannelConcurrentConsumers());
+ return config;
+ }
+
+ protected QueueChannelConsumerConfig newConfigForMatchingChannel() {
+ QueueChannelConsumerConfig config = new QueueChannelConsumerConfig();
+ config.setConcurrentConsumers(getMatchingChannelConcurrentConsumers());
+ return config;
+ }
+
public int getDeliveryChannelConcurrentConsumers() {
return SubscriptionConstants.DELIVERY_CHANNEL_CONCURRENT_CONSUMERS;
}
@@ -74,12 +89,12 @@ public class SubscriptionChannelFactory {
return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS;
}
- public static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements MessageHandler, DisposableBean {
+ public static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements IQueueChannelReceiver, DisposableBean {
- private final SubscribableChannel myWrappedChannel;
+ private final IQueueChannelReceiver myWrappedChannel;
- public BroadcastingSubscribableChannelWrapper(SubscribableChannel theChannel) {
- theChannel.subscribe(this);
+ public BroadcastingSubscribableChannelWrapper(IQueueChannelReceiver theChannel) {
+ theChannel.subscribe(message -> send(message));
myWrappedChannel = theChannel;
}
@@ -89,20 +104,10 @@ public class SubscriptionChannelFactory {
@Override
protected boolean sendInternal(Message> theMessage, long timeout) {
-// try {
- for (MessageHandler next : getSubscribers()) {
- next.handleMessage(theMessage);
- }
- return true;
-// } catch (Exception e) {
-// ourLog.error("Failiure handling message", e);
-// return false;
-// }
- }
-
- @Override
- public void handleMessage(Message> message) throws MessagingException {
- send(message);
+ for (MessageHandler next : getSubscribers()) {
+ next.handleMessage(theMessage);
+ }
+ return true;
}
@Override
@@ -112,5 +117,12 @@ public class SubscriptionChannelFactory {
}
}
+ @Override
+ public void addInterceptor(ChannelInterceptor interceptor) {
+ super.addInterceptor(interceptor);
+ myWrappedChannel.addInterceptor(interceptor);
+ }
+
+
}
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java
index d5bffd1cb5c..f38def3d0f9 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java
@@ -22,7 +22,6 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import org.slf4j.Logger;
@@ -32,7 +31,6 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
-import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@@ -61,7 +59,7 @@ public class SubscriptionChannelRegistry {
return;
}
- SubscribableChannel deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName);
+ SubscribableChannel deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryReceivingChannel(channelName);
Optional deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType());
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel);
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/SubscriptionProcessorConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/SubscriptionProcessorConfig.java
index 1f7eed79122..318ab1c403a 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/SubscriptionProcessorConfig.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/SubscriptionProcessorConfig.java
@@ -97,11 +97,6 @@ public class SubscriptionProcessorConfig {
return new DaoResourceRetriever();
}
- @Bean
- public WebsocketConnectionValidator websocketConnectionValidator() {
- return new WebsocketConnectionValidator();
- }
-
@Bean
public SubscriptionLoader subscriptionLoader() {
return new SubscriptionLoader();
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/WebsocketDispatcherConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/WebsocketDispatcherConfig.java
similarity index 87%
rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/WebsocketDispatcherConfig.java
rename to hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/WebsocketDispatcherConfig.java
index c88275d7cb6..845c52322b9 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/WebsocketDispatcherConfig.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/WebsocketDispatcherConfig.java
@@ -1,8 +1,8 @@
-package ca.uhn.fhir.jpa.config;
+package ca.uhn.fhir.jpa.subscription.process.config;
/*
* #%L
- * HAPI FHIR JPA Server
+ * HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
@@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.subscription.process.deliver.websocket.SubscriptionWebsocketHandler;
+import ca.uhn.fhir.jpa.subscription.process.deliver.websocket.WebsocketConnectionValidator;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@@ -37,9 +38,15 @@ import org.springframework.web.socket.handler.PerConnectionWebSocketHandler;
@EnableWebSocket()
@Controller
public class WebsocketDispatcherConfig implements WebSocketConfigurer {
+
@Autowired
ModelConfig myModelConfig;
+ @Bean
+ public WebsocketConnectionValidator websocketConnectionValidator() {
+ return new WebsocketConnectionValidator();
+ }
+
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry theRegistry) {
theRegistry.addHandler(subscriptionWebSocketHandler(), myModelConfig.getWebsocketContextPath()).setAllowedOrigins("*");
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/deliver/websocket/SubscriptionWebsocketHandler.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/deliver/websocket/SubscriptionWebsocketHandler.java
index 58d0ebe8a3b..c36e686de85 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/deliver/websocket/SubscriptionWebsocketHandler.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/deliver/websocket/SubscriptionWebsocketHandler.java
@@ -50,6 +50,13 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
@Autowired
SubscriptionChannelRegistry mySubscriptionChannelRegistry;
+ /**
+ * Constructor
+ */
+ public SubscriptionWebsocketHandler() {
+ super();
+ }
+
@Autowired
private FhirContext myCtx;
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/deliver/websocket/WebsocketConnectionValidator.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/deliver/websocket/WebsocketConnectionValidator.java
index 0442db66a03..73819f05db2 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/deliver/websocket/WebsocketConnectionValidator.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/deliver/websocket/WebsocketConnectionValidator.java
@@ -36,6 +36,13 @@ public class WebsocketConnectionValidator {
SubscriptionRegistry mySubscriptionRegistry;
+ /**
+ * Constructor
+ */
+ public WebsocketConnectionValidator() {
+ super();
+ }
+
public WebsocketValidationResponse validate(@NotNull IdType id) {
if (!id.hasIdPart() || !id.isIdPartValid()) {
return WebsocketValidationResponse.INVALID_RESPONSE("Invalid bind request - No ID included: " + id.getValue());
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/BaseSubscriberForSubscriptionResources.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/BaseSubscriberForSubscriptionResources.java
index 730296ab0f8..d81a86a8212 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/BaseSubscriberForSubscriptionResources.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/BaseSubscriberForSubscriptionResources.java
@@ -36,9 +36,19 @@ public abstract class BaseSubscriberForSubscriptionResources implements MessageH
protected FhirContext myFhirContext;
protected boolean isSubscription(ResourceModifiedMessage theNewResource) {
- IBaseResource payload = theNewResource.getNewPayload(myFhirContext);
- String payloadIdType = myFhirContext.getResourceDefinition(payload).getName();
- return payloadIdType.equals(ResourceTypeEnum.SUBSCRIPTION.getCode());
+ String payloadIdType = null;
+ IIdType payloadId = theNewResource.getId(myFhirContext);
+ if (payloadId != null) {
+ payloadIdType = payloadId.getResourceType();
+ }
+ if (isBlank(payloadIdType)) {
+ IBaseResource payload = theNewResource.getNewPayload(myFhirContext);
+ if (payload != null) {
+ payloadIdType = myFhirContext.getResourceDefinition(payload).getName();
+ }
+ }
+
+ return ResourceTypeEnum.SUBSCRIPTION.getCode().equals(payloadIdType);
}
}
diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactoryTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactoryTest.java
new file mode 100644
index 00000000000..6f3b979551d
--- /dev/null
+++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactoryTest.java
@@ -0,0 +1,76 @@
+package ca.uhn.fhir.jpa.subscription.channel.subscription;
+
+import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelReceiver;
+import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannelFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageDeliveryException;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessagingException;
+import org.springframework.messaging.support.ChannelInterceptor;
+import org.springframework.messaging.support.GenericMessage;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SubscriptionChannelFactoryTest {
+
+ private SubscriptionChannelFactory mySvc;
+
+ @Mock
+ private ChannelInterceptor myInterceptor;
+ @Captor
+ private ArgumentCaptor myExceptionCaptor;
+
+ @Before
+ public void before() {
+ mySvc = new SubscriptionChannelFactory(new LinkedBlockingQueueChannelFactory());
+ }
+
+ /**
+ * Make sure the channel doesn't silently swallow exceptions
+ */
+ @Test
+ public void testInterceptorsOnChannelWrapperArePropagated() {
+
+ IQueueChannelReceiver channel = mySvc.newDeliveryReceivingChannel("CHANNEL_NAME");
+ channel.subscribe(new NpeThrowingHandler());
+ channel.addInterceptor(myInterceptor);
+
+ Message> input = new GenericMessage<>("TEST");
+
+ when(myInterceptor.preSend(any(),any())).thenAnswer(t->t.getArgument(0, Message.class));
+ when(myInterceptor.postReceive(any(),any())).thenAnswer(t->t.getArgument(0, Message.class));
+
+ try {
+ channel.send(input);
+ fail();
+ } catch (MessageDeliveryException e) {
+ assertTrue(e.getCause() instanceof NullPointerException);
+ }
+
+ verify(myInterceptor, times(1)).afterSendCompletion(any(), any(), anyBoolean(), myExceptionCaptor.capture());
+
+ assertTrue(myExceptionCaptor.getValue() instanceof NullPointerException);
+ }
+
+
+ private class NpeThrowingHandler implements MessageHandler {
+ @Override
+ public void handleMessage(Message> message) throws MessagingException {
+ throw new NullPointerException("THIS IS THE MESSAGE");
+ }
+ }
+}
diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java
index 52d8ffaa687..c1bdde59ea3 100644
--- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java
+++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java
@@ -6,7 +6,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider;
-import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
+import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.interceptor.CascadingDeleteInterceptor;
import ca.uhn.fhir.jpa.provider.GraphQLProvider;
import ca.uhn.fhir.jpa.provider.JpaConformanceProviderDstu2;
@@ -20,7 +20,6 @@ import ca.uhn.fhir.jpa.provider.r5.JpaConformanceProviderR5;
import ca.uhn.fhir.jpa.provider.r5.JpaSystemProviderR5;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
-import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.util.ResourceProviderFactory;
import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator;
import ca.uhn.fhir.rest.api.EncodingEnum;
diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/CommonConfig.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/CommonConfig.java
index 53bb186f8c2..9a964fd103b 100644
--- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/CommonConfig.java
+++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/CommonConfig.java
@@ -1,6 +1,6 @@
package ca.uhn.fhirtest.config;
-import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
+import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.channel.config.SubscriptionChannelConfig;
import ca.uhn.fhir.jpa.subscription.process.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/auth/SearchNarrowingInterceptor.java_703256810379985 b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/auth/SearchNarrowingInterceptor.java_703256810379985
new file mode 100644
index 00000000000..3ede68f6b6a
--- /dev/null
+++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/auth/SearchNarrowingInterceptor.java_703256810379985
@@ -0,0 +1,278 @@
+package ca.uhn.fhir.rest.server.interceptor.auth;
+
+/*-
+ * #%L
+ * HAPI FHIR - Server Framework
+ * %%
+ * Copyright (C) 2014 - 2020 University Health Network
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import ca.uhn.fhir.context.FhirContext;
+import ca.uhn.fhir.context.RuntimeResourceDefinition;
+import ca.uhn.fhir.context.RuntimeSearchParam;
+import ca.uhn.fhir.interceptor.api.Hook;
+import ca.uhn.fhir.interceptor.api.Pointcut;
+import ca.uhn.fhir.rest.api.QualifiedParamList;
+import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
+import ca.uhn.fhir.rest.api.server.RequestDetails;
+import ca.uhn.fhir.rest.param.ParameterUtil;
+import ca.uhn.fhir.rest.server.exceptions.AuthenticationException;
+import ca.uhn.fhir.rest.server.method.BaseMethodBinding;
+import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
+import ca.uhn.fhir.rest.server.servlet.ServletSubRequestDetails;
+import ca.uhn.fhir.rest.server.util.ServletRequestUtil;
+import ca.uhn.fhir.util.BundleUtil;
+import ca.uhn.fhir.util.bundle.ModifiableBundleEntry;
+import com.google.common.collect.ArrayListMultimap;
+import org.apache.commons.collections4.ListUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+import org.hl7.fhir.instance.model.api.IBaseBundle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.util.*;
+import java.util.function.Consumer;
+
+/**
+ * This interceptor can be used to automatically narrow the scope of searches in order to
+ * automatically restrict the searches to specific compartments.
+ *
+ * For example, this interceptor
+ * could be used to restrict a user to only viewing data belonging to Patient/123 (i.e. data
+ * in the Patient/123
compartment). In this case, a user performing a search
+ * for
+ * http://baseurl/Observation?category=laboratory
+ * would receive results as though they had requested
+ * http://baseurl/Observation?subject=Patient/123&category=laboratory
+ *
+ *
+ * Note that this interceptor should be used in combination with {@link AuthorizationInterceptor}
+ * if you are restricting results because of a security restriction. This interceptor is not
+ * intended to be a failsafe way of preventing users from seeing the wrong data (that is the
+ * purpose of AuthorizationInterceptor). This interceptor is simply intended as a convenience to
+ * help users simplify their queries while not receiving security errors for to trying to access
+ * data they do not have access to see.
+ *
+ *
+ * @see AuthorizationInterceptor
+ */
+public class SearchNarrowingInterceptor {
+ private static final Logger ourLog = LoggerFactory.getLogger(SearchNarrowingInterceptor.class);
+
+
+ /**
+ * Subclasses should override this method to supply the set of compartments that
+ * the user making the request should actually have access to.
+ *
+ * Typically this is done by examining theRequestDetails
to find
+ * out who the current user is and then building a list of Strings.
+ *
+ *
+ * @param theRequestDetails The individual request currently being applied
+ * @return The list of allowed compartments and instances that should be used
+ * for search narrowing. If this method returns null
, no narrowing will
+ * be performed
+ */
+ protected AuthorizedList buildAuthorizedList(@SuppressWarnings("unused") RequestDetails theRequestDetails) {
+ return null;
+ }
+
+ @Hook(Pointcut.SERVER_INCOMING_REQUEST_POST_PROCESSED)
+ public boolean incomingRequestPostProcessed(RequestDetails theRequestDetails, HttpServletRequest theRequest, HttpServletResponse theResponse) throws AuthenticationException {
+ // We don't support this operation type yet
+ Validate.isTrue(theRequestDetails.getRestOperationType() != RestOperationTypeEnum.SEARCH_SYSTEM);
+
+ if (theRequestDetails.getRestOperationType() != RestOperationTypeEnum.SEARCH_TYPE) {
+ return true;
+ }
+
+ FhirContext ctx = theRequestDetails.getServer().getFhirContext();
+ RuntimeResourceDefinition resDef = ctx.getResourceDefinition(theRequestDetails.getResourceName());
+ HashMap> parameterToOrValues = new HashMap<>();
+ AuthorizedList authorizedList = buildAuthorizedList(theRequestDetails);
+ if (authorizedList == null) {
+ return true;
+ }
+
+ /*
+ * Create a map of search parameter values that need to be added to the
+ * given request
+ */
+ Collection compartments = authorizedList.getAllowedCompartments();
+ if (compartments != null) {
+ processResourcesOrCompartments(theRequestDetails, resDef, parameterToOrValues, compartments, true);
+ }
+ Collection resources = authorizedList.getAllowedInstances();
+ if (resources != null) {
+ processResourcesOrCompartments(theRequestDetails, resDef, parameterToOrValues, resources, false);
+ }
+
+ /*
+ * Add any param values to the actual request
+ */
+ if (parameterToOrValues.size() > 0) {
+ Map newParameters = new HashMap<>(theRequestDetails.getParameters());
+ for (Map.Entry> nextEntry : parameterToOrValues.entrySet()) {
+ String nextParamName = nextEntry.getKey();
+ List nextAllowedValues = nextEntry.getValue();
+
+ if (!newParameters.containsKey(nextParamName)) {
+
+ /*
+ * If we don't already have a parameter of the given type, add one
+ */
+ String nextValuesJoined = ParameterUtil.escapeAndJoinOrList(nextAllowedValues);
+ String[] paramValues = {nextValuesJoined};
+ newParameters.put(nextParamName, paramValues);
+
+ } else {
+
+ /*
+ * If the client explicitly requested the given parameter already, we'll
+ * just update the request to have the intersection of the values that the client
+ * requested, and the values that the user is allowed to see
+ */
+ String[] existingValues = newParameters.get(nextParamName);
+ boolean restrictedExistingList = false;
+ for (int i = 0; i < existingValues.length; i++) {
+
+ String nextExistingValue = existingValues[i];
+ List nextRequestedValues = QualifiedParamList.splitQueryStringByCommasIgnoreEscape(null, nextExistingValue);
+ List nextPermittedValues = ListUtils.intersection(nextRequestedValues, nextAllowedValues);
+ if (nextPermittedValues.size() > 0) {
+ restrictedExistingList = true;
+ existingValues[i] = ParameterUtil.escapeAndJoinOrList(nextPermittedValues);
+ }
+
+ }
+
+ /*
+ * If none of the values that were requested by the client overlap at all
+ * with the values that the user is allowed to see, we'll just add the permitted
+ * list as a new list. Ultimately this scenario actually means that the client
+ * shouldn't get *any* results back, and adding a new AND parameter (that doesn't
+ * overlap at all with the others) is one way of ensuring that.
+ */
+ if (!restrictedExistingList) {
+ String[] newValues = Arrays.copyOf(existingValues, existingValues.length + 1);
+ newValues[existingValues.length] = ParameterUtil.escapeAndJoinOrList(nextAllowedValues);
+ newParameters.put(nextParamName, newValues);
+ }
+ }
+
+ }
+ theRequestDetails.setParameters(newParameters);
+ }
+
+ return true;
+ }
+
+ @Hook(Pointcut.SERVER_INCOMING_REQUEST_PRE_HANDLED)
+ public void incomingRequestPreHandled(ServletRequestDetails theRequestDetails, HttpServletRequest theRequest, HttpServletResponse theResponse) throws AuthenticationException {
+ if (theRequestDetails.getRestOperationType() != RestOperationTypeEnum.TRANSACTION) {
+ return;
+ }
+
+ IBaseBundle bundle = (IBaseBundle) theRequestDetails.getResource();
+ FhirContext ctx = theRequestDetails.getFhirContext();
+ BundleEntryUrlProcessor processor = new BundleEntryUrlProcessor(ctx, theRequestDetails, theRequest, theResponse);
+ BundleUtil.processEntries(ctx, bundle, processor);
+ }
+
+ private class BundleEntryUrlProcessor implements Consumer {
+ private final FhirContext myFhirContext;
+ private final ServletRequestDetails myRequestDetails;
+ private final HttpServletRequest myRequest;
+ private final HttpServletResponse myResponse;
+
+ public BundleEntryUrlProcessor(FhirContext theFhirContext, ServletRequestDetails theRequestDetails, HttpServletRequest theRequest, HttpServletResponse theResponse) {
+ myFhirContext = theFhirContext;
+ myRequestDetails = theRequestDetails;
+ myRequest = theRequest;
+ myResponse = theResponse;
+ }
+
+ @Override
+ public void accept(ModifiableBundleEntry theModifiableBundleEntry) {
+ ArrayListMultimap paramValues = ArrayListMultimap.create();
+
+ String url = theModifiableBundleEntry.getRequestUrl();
+
+ ServletSubRequestDetails subServletRequestDetails = ServletRequestUtil.getServletSubRequestDetails(myRequestDetails, url, paramValues);
+ BaseMethodBinding> method = subServletRequestDetails.getServer().determineResourceMethod(subServletRequestDetails, url);
+ RestOperationTypeEnum restOperationType = method.getRestOperationType();
+ subServletRequestDetails.setRestOperationType(restOperationType);
+
+ incomingRequestPostProcessed(subServletRequestDetails, myRequest, myResponse);
+
+ theModifiableBundleEntry.setRequestUrl(myFhirContext, ServletRequestUtil.extractUrl(subServletRequestDetails));
+ }
+ }
+
+ private void processResourcesOrCompartments(RequestDetails theRequestDetails, RuntimeResourceDefinition theResDef, HashMap> theParameterToOrValues, Collection theResourcesOrCompartments, boolean theAreCompartments) {
+ String lastCompartmentName = null;
+ String lastSearchParamName = null;
+ for (String nextCompartment : theResourcesOrCompartments) {
+ Validate.isTrue(StringUtils.countMatches(nextCompartment, '/') == 1, "Invalid compartment name (must be in form \"ResourceType/xxx\": %s", nextCompartment);
+ String compartmentName = nextCompartment.substring(0, nextCompartment.indexOf('/'));
+
+ String searchParamName = null;
+ if (compartmentName.equalsIgnoreCase(lastCompartmentName)) {
+
+ // Avoid doing a lookup for the same thing repeatedly
+ searchParamName = lastSearchParamName;
+
+ } else {
+
+ if (compartmentName.equalsIgnoreCase(theRequestDetails.getResourceName())) {
+
+ searchParamName = "_id";
+
+ } else if (theAreCompartments) {
+
+ List searchParams = theResDef.getSearchParamsForCompartmentName(compartmentName);
+ if (searchParams.size() > 0) {
+
+ // Resources like Observation have several fields that add the resource to
+ // the compartment. In the case of Observation, it's subject, patient and performer.
+ // For this kind of thing, we'll prefer the one called "patient".
+ RuntimeSearchParam searchParam =
+ searchParams
+ .stream()
+ .filter(t -> t.getName().equalsIgnoreCase(compartmentName))
+ .findFirst()
+ .orElse(searchParams.get(0));
+ searchParamName = searchParam.getName();
+
+ }
+ }
+
+ lastCompartmentName = compartmentName;
+ lastSearchParamName = searchParamName;
+
+ }
+
+ if (searchParamName != null) {
+ List orValues = theParameterToOrValues.computeIfAbsent(searchParamName, t -> new ArrayList<>());
+ orValues.add(nextCompartment);
+ }
+ }
+ }
+
+}
diff --git a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-autoconfigure/src/main/java/ca/uhn/fhir/spring/boot/autoconfigure/FhirAutoConfiguration.java b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-autoconfigure/src/main/java/ca/uhn/fhir/spring/boot/autoconfigure/FhirAutoConfiguration.java
index e72f835c0bb..0306728a29a 100644
--- a/hapi-fhir-spring-boot/hapi-fhir-spring-boot-autoconfigure/src/main/java/ca/uhn/fhir/spring/boot/autoconfigure/FhirAutoConfiguration.java
+++ b/hapi-fhir-spring-boot/hapi-fhir-spring-boot-autoconfigure/src/main/java/ca/uhn/fhir/spring/boot/autoconfigure/FhirAutoConfiguration.java
@@ -27,7 +27,6 @@ import ca.uhn.fhir.jpa.config.BaseJavaConfigDstu2;
import ca.uhn.fhir.jpa.config.BaseJavaConfigDstu3;
import ca.uhn.fhir.jpa.config.BaseJavaConfigR4;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
-import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.provider.BaseJpaProvider;
import ca.uhn.fhir.jpa.provider.BaseJpaSystemProvider;
diff --git a/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/PointcutLatch.java b/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/PointcutLatch.java
index e9128ae289a..af4e01ebac0 100644
--- a/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/PointcutLatch.java
+++ b/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/PointcutLatch.java
@@ -98,6 +98,10 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
setExpectedCount(theCount, false);
}
+ public boolean isSet() {
+ return myCountdownLatch.get() != null;
+ }
+
private void createLatch(int theCount) {
myFailures.set(Collections.synchronizedList(new ArrayList<>()));
myCalledWith.set(Collections.synchronizedList(new ArrayList<>()));