diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingInterceptor.java
index a86607801da..b4fe6344785 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingInterceptor.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingInterceptor.java
@@ -29,22 +29,20 @@ import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.interceptor.api.Hook;
import ca.uhn.fhir.jpa.model.interceptor.api.Interceptor;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
-import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionCanonicalizer;
+import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.matcher.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.module.matcher.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import ca.uhn.fhir.parser.DataFormatException;
-import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.SubscriptionUtil;
import com.google.common.annotations.VisibleForTesting;
import org.hl7.fhir.instance.model.Subscription;
import org.hl7.fhir.instance.model.api.IBaseResource;
-import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,8 +74,6 @@ public class SubscriptionActivatingInterceptor {
private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingInterceptor.class);
private static boolean ourWaitForSubscriptionActivationSynchronouslyForUnitTest;
- private static final String REQUESTED_STATUS = Subscription.SubscriptionStatus.REQUESTED.toCode();
- private static final String ACTIVE_STATUS = Subscription.SubscriptionStatus.ACTIVE.toCode();
@Autowired
private PlatformTransactionManager myTransactionManager;
@@ -93,8 +89,6 @@ public class SubscriptionActivatingInterceptor {
@Autowired
private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
@Autowired
- private MatchUrlService myMatchUrlService;
- @Autowired
private DaoConfig myDaoConfig;
@Autowired
private SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator;
@@ -104,7 +98,7 @@ public class SubscriptionActivatingInterceptor {
// subscriber applies..
String subscriptionChannelTypeCode = myFhirContext
.newTerser()
- .getSingleValueOrNull(theSubscription, SubscriptionMatcherInterceptor.SUBSCRIPTION_TYPE, IPrimitiveType.class)
+ .getSingleValueOrNull(theSubscription, SubscriptionConstants.SUBSCRIPTION_TYPE, IPrimitiveType.class)
.getValueAsString();
Subscription.SubscriptionChannelType subscriptionChannelType = Subscription.SubscriptionChannelType.fromCode(subscriptionChannelTypeCode);
@@ -113,10 +107,9 @@ public class SubscriptionActivatingInterceptor {
return false;
}
- final IPrimitiveType> status = myFhirContext.newTerser().getSingleValueOrNull(theSubscription, SubscriptionMatcherInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class);
- String statusString = status.getValueAsString();
+ String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription);
- if (REQUESTED_STATUS.equals(statusString)) {
+ if (SubscriptionConstants.REQUESTED_STATUS.equals(statusString)) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
/*
* If we're in a transaction, we don't want to try and change the status from
@@ -133,7 +126,7 @@ public class SubscriptionActivatingInterceptor {
Future> activationFuture = myTaskExecutor.submit(new Runnable() {
@Override
public void run() {
- activateSubscription(ACTIVE_STATUS, theSubscription, REQUESTED_STATUS);
+ activateSubscription(SubscriptionConstants.ACTIVE_STATUS, theSubscription, SubscriptionConstants.REQUESTED_STATUS);
}
});
@@ -152,9 +145,9 @@ public class SubscriptionActivatingInterceptor {
});
return true;
} else {
- return activateSubscription(ACTIVE_STATUS, theSubscription, REQUESTED_STATUS);
+ return activateSubscription(SubscriptionConstants.ACTIVE_STATUS, theSubscription, SubscriptionConstants.REQUESTED_STATUS);
}
- } else if (ACTIVE_STATUS.equals(statusString)) {
+ } else if (SubscriptionConstants.ACTIVE_STATUS.equals(statusString)) {
return mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription);
} else {
// Status isn't "active" or "requested"
diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java
index 94d4fc65600..d5d5c665305 100644
--- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java
+++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java
@@ -48,8 +48,6 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);
private static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
- static final String SUBSCRIPTION_STATUS = "Subscription.status";
- static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
private SubscribableChannel myProcessingChannel;
@Autowired
diff --git a/hapi-fhir-jpaserver-subscription/pom.xml b/hapi-fhir-jpaserver-subscription/pom.xml
index 49c0b24cfbf..c19ed86448a 100644
--- a/hapi-fhir-jpaserver-subscription/pom.xml
+++ b/hapi-fhir-jpaserver-subscription/pom.xml
@@ -97,6 +97,11 @@
jetty-servlet
test
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/ResourceModifiedMessage.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/ResourceModifiedMessage.java
index 7e846cf68a7..f82cd27d3ce 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/ResourceModifiedMessage.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/ResourceModifiedMessage.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.lang3.builder.ToStringBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
@@ -168,11 +169,13 @@ public class ResourceModifiedMessage implements IResourceMessage {
@Override
public String toString() {
- String resourceId = myPayloadId;
- if (resourceId == null) {
- resourceId = myId;
- }
- return "ResourceModified Message { " + myOperationType + ", " + resourceId + "}";
+ return new ToStringBuilder(this)
+ .append("myId", myId)
+ .append("myOperationType", myOperationType)
+ .append("mySubscriptionId", mySubscriptionId)
+// .append("myPayload", myPayload)
+ .append("myPayloadId", myPayloadId)
+// .append("myPayloadDecoded", myPayloadDecoded)
+ .toString();
}
-
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionCanonicalizer.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionCanonicalizer.java
index cd0802bf645..c5572024a7c 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionCanonicalizer.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionCanonicalizer.java
@@ -29,11 +29,11 @@ import ca.uhn.fhir.model.api.ExtensionDt;
import ca.uhn.fhir.model.api.IPrimitiveDatatype;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
-import org.hl7.fhir.dstu3.model.Subscription;
import org.hl7.fhir.exceptions.FHIRException;
import org.hl7.fhir.instance.model.api.IBaseMetaType;
import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
+import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Extension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -292,4 +292,13 @@ public class SubscriptionCanonicalizer {
}
meta.addTag().setSystem(SubscriptionConstants.EXT_SUBSCRIPTION_MATCHING_STRATEGY).setCode(value).setDisplay(display);
}
+
+ public String getSubscriptionStatus(IBaseResource theSubscription) {
+ final IPrimitiveType> status = myFhirContext.newTerser().getSingleValueOrNull(theSubscription, SubscriptionConstants.SUBSCRIPTION_STATUS, IPrimitiveType.class);
+ if (status == null) {
+ return null;
+ }
+ return status.getValueAsString();
+ }
+
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionConstants.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionConstants.java
index 66f4cd58d18..2aa9f2be379 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionConstants.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionConstants.java
@@ -20,6 +20,8 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
* #L%
*/
+import org.hl7.fhir.instance.model.Subscription;
+
public class SubscriptionConstants {
/**
@@ -90,4 +92,8 @@ public class SubscriptionConstants {
*/
public static final int DELIVERY_EXECUTOR_QUEUE_SIZE = 1000;
+ public static final String SUBSCRIPTION_STATUS = "Subscription.status";
+ public static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
+ public static final String REQUESTED_STATUS = Subscription.SubscriptionStatus.REQUESTED.toCode();
+ public static final String ACTIVE_STATUS = Subscription.SubscriptionStatus.ACTIVE.toCode();
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java
index 8fd70e7f358..bb67f790cca 100755
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionLoader.java
@@ -87,7 +87,6 @@ public class SubscriptionLoader {
ourLog.debug("Starting sync subscriptions");
SearchParameterMap map = new SearchParameterMap();
map.add(Subscription.SP_STATUS, new TokenOrListParam()
- // TODO KHS perhaps we should only be requesting ACTIVE subscriptions here?...
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
map.setLoadSynchronousUpTo(SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS);
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java
index 1ec1447ff51..a57bb0e3cf4 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java
@@ -23,6 +23,8 @@ package ca.uhn.fhir.jpa.subscription.module.standalone;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
+import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionCanonicalizer;
+import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
@@ -46,6 +48,8 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler {
SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber;
@Autowired
SubscriptionRegistry mySubscriptionRegistry;
+ @Autowired
+ SubscriptionCanonicalizer mySubscriptionCanonicalizer;
@Override
public void handleMessage(Message> theMessage) throws MessagingException {
@@ -61,7 +65,10 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler {
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(resource);
if (resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode())) {
- mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource);
+ String status = mySubscriptionCanonicalizer.getSubscriptionStatus(resource);
+ if (SubscriptionConstants.ACTIVE_STATUS.equals(status)) {
+ mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource);
+ }
}
mySubscriptionMatchingSubscriber.matchActiveSubscriptionsAndDeliver(theResourceModifiedMessage);
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryJsonMessage.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryJsonMessage.java
index 413d639fa02..3a9884f5f23 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryJsonMessage.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryJsonMessage.java
@@ -23,6 +23,7 @@ package ca.uhn.fhir.jpa.subscription.module.subscriber;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.lang3.builder.ToStringBuilder;
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
@@ -54,4 +55,10 @@ public class ResourceDeliveryJsonMessage extends BaseJsonMessage theList) {
StopWatch sw = new StopWatch();
while (theList.size() != theTarget && sw.getMillis() <= 16000) {
@@ -37,4 +41,16 @@ public abstract class BaseSubscriptionDstu3Test extends BaseSubscriptionTest {
fail("Size " + theList.size() + " is != target " + theTarget + " - Got: " + describeResults);
}
}
+
+ protected long nextId() {
+ return mySubscriptionTestHelper.nextId();
+ }
+
+ protected Subscription makeActiveSubscription(String theCriteria, String thePayload, String theEndpoint) {
+ return mySubscriptionTestHelper.makeActiveSubscription(theCriteria, thePayload, theEndpoint);
+ }
+
+ protected Subscription makeSubscriptionWithStatus(String theCriteria, String thePayload, String theEndpoint, Subscription.SubscriptionStatus status) {
+ return mySubscriptionTestHelper.makeSubscriptionWithStatus(theCriteria, thePayload, theEndpoint, status);
+ }
}
diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java
index 9a1996a127e..06cef734fbe 100644
--- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java
+++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java
@@ -44,5 +44,4 @@ public abstract class BaseSubscriptionTest {
myMockFhirClientSubscriptionProvider.setBundleProvider(theBundleProvider);
mySubscriptionLoader.doSyncSubscriptionsForUnitTest();
}
-
}
diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/FhirObjectPrinter.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/FhirObjectPrinter.java
index 0049bef852f..874157a9546 100644
--- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/FhirObjectPrinter.java
+++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/FhirObjectPrinter.java
@@ -1,7 +1,5 @@
package ca.uhn.fhir.jpa.subscription.module;
-import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryJsonMessage;
-import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage;
import org.hl7.fhir.instance.model.api.IBaseResource;
import java.util.function.Function;
@@ -11,16 +9,7 @@ public class FhirObjectPrinter implements Function