diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java index 2d060c56575..8145dff30d1 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java @@ -478,6 +478,8 @@ public abstract class BaseSubscriptionInterceptor exten TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { + // FIXME: remove + ourLog.info("** Sending processing message " + theMessage + " for: " + theMessage.getNewPayload(myCtx)); ourLog.trace("Sending resource modified message to processing channel"); getProcessingChannel().send(new ResourceModifiedJsonMessage(theMessage)); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/CanonicalSubscription.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/CanonicalSubscription.java index 75b4126b4b7..d1c561db64c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/CanonicalSubscription.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/CanonicalSubscription.java @@ -132,7 +132,7 @@ public class CanonicalSubscription implements Serializable { return retVal; } - String getIdElementString() { + public String getIdElementString() { return myIdElement; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java index f7f7c8d374c..ca1ffa7ad7d 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java @@ -48,6 +48,13 @@ public class ResourceDeliveryMessage { return myOperationType; } + /** + * Constructor + */ + public ResourceDeliveryMessage() { + super(); + } + public void setOperationType(ResourceModifiedMessage.OperationTypeEnum theOperationType) { myOperationType = theOperationType; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java index 7b80f1dfef8..209f5cb23bc 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java @@ -69,7 +69,7 @@ public class SubscriptionActivatingSubscriber { Validate.notNull(theTaskExecutor); } - public synchronized boolean activateOrRegisterSubscriptionIfRequired(final IBaseResource theSubscription) { + public boolean activateOrRegisterSubscriptionIfRequired(final IBaseResource theSubscription) { // Grab the value for "Subscription.channel.type" so we can see if this // subscriber applies.. String subscriptionChannelType = myCtx @@ -180,7 +180,7 @@ public class SubscriptionActivatingSubscriber { } - private synchronized void activateAndRegisterSubscriptionIfRequiredInTransaction(IBaseResource theSubscription) { + private void activateAndRegisterSubscriptionIfRequiredInTransaction(IBaseResource theSubscription) { TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager); txTemplate.execute(new TransactionCallbackWithoutResult() { @Override @@ -190,7 +190,7 @@ public class SubscriptionActivatingSubscriber { }); } - protected boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) { + protected synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) { CanonicalSubscription existingSubscription = mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement()); CanonicalSubscription newSubscription = mySubscriptionInterceptor.canonicalize(theSubscription); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionDeliveringRestHookSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionDeliveringRestHookSubscriber.java index 37ed373598c..4a1cbd8167a 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionDeliveringRestHookSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionDeliveringRestHookSubscriber.java @@ -108,6 +108,10 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe operation.encoded(thePayloadType); } + // FIXME: remove + ourLog.info("** This " + this + " Processing delivery message " + theMsg); + + ourLog.info("Delivering {} rest-hook payload {} for {}", theMsg.getOperationType(), thePayloadResource.getIdElement().toUnqualified().getValue(), theSubscription.getIdElement(getContext()).toUnqualifiedVersionless().getValue()); try { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionRestHookInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionRestHookInterceptor.java index 83ff6a3257d..17f597d548b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionRestHookInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionRestHookInterceptor.java @@ -22,15 +22,21 @@ package ca.uhn.fhir.jpa.subscription.resthook; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor; import ca.uhn.fhir.jpa.subscription.CanonicalSubscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.messaging.MessageHandler; import java.util.Optional; public class SubscriptionRestHookInterceptor extends BaseSubscriptionInterceptor { + private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRestHookInterceptor.class); @Override protected Optional createDeliveryHandler(CanonicalSubscription theSubscription) { - return Optional.of(new SubscriptionDeliveringRestHookSubscriber(getSubscriptionDao(), getChannelType(), this)); + SubscriptionDeliveringRestHookSubscriber value = new SubscriptionDeliveringRestHookSubscriber(getSubscriptionDao(), getChannelType(), this); + // FIXME: remove + ourLog.info("** Creating delivery subscriber " + value + " for " + theSubscription.getIdElementString()); + return Optional.of(value); } @Override diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java index ce3510616d8..89b114a0724 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestR4Config.java @@ -38,6 +38,7 @@ public class TestR4Config extends BaseJavaConfigR4 { * starvation */ ourMaxThreads = (int) (Math.random() * 6.0) + 1; + ourMaxThreads = 1; } private Exception myLastStackTrace;