FIx up subscription processing

This commit is contained in:
James Agnew 2018-10-17 21:23:52 -04:00
parent de76f37845
commit b78aaea415
7 changed files with 25 additions and 5 deletions

View File

@ -478,6 +478,8 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
// FIXME: remove
ourLog.info("** Sending processing message " + theMessage + " for: " + theMessage.getNewPayload(myCtx));
ourLog.trace("Sending resource modified message to processing channel");
getProcessingChannel().send(new ResourceModifiedJsonMessage(theMessage));
}

View File

@ -132,7 +132,7 @@ public class CanonicalSubscription implements Serializable {
return retVal;
}
String getIdElementString() {
public String getIdElementString() {
return myIdElement;
}

View File

@ -48,6 +48,13 @@ public class ResourceDeliveryMessage {
return myOperationType;
}
/**
* Constructor
*/
public ResourceDeliveryMessage() {
super();
}
public void setOperationType(ResourceModifiedMessage.OperationTypeEnum theOperationType) {
myOperationType = theOperationType;
}

View File

@ -69,7 +69,7 @@ public class SubscriptionActivatingSubscriber {
Validate.notNull(theTaskExecutor);
}
public synchronized boolean activateOrRegisterSubscriptionIfRequired(final IBaseResource theSubscription) {
public boolean activateOrRegisterSubscriptionIfRequired(final IBaseResource theSubscription) {
// Grab the value for "Subscription.channel.type" so we can see if this
// subscriber applies..
String subscriptionChannelType = myCtx
@ -180,7 +180,7 @@ public class SubscriptionActivatingSubscriber {
}
private synchronized void activateAndRegisterSubscriptionIfRequiredInTransaction(IBaseResource theSubscription) {
private void activateAndRegisterSubscriptionIfRequiredInTransaction(IBaseResource theSubscription) {
TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
@ -190,7 +190,7 @@ public class SubscriptionActivatingSubscriber {
});
}
protected boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
protected synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
CanonicalSubscription existingSubscription = mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement());
CanonicalSubscription newSubscription = mySubscriptionInterceptor.canonicalize(theSubscription);

View File

@ -108,6 +108,10 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
operation.encoded(thePayloadType);
}
// FIXME: remove
ourLog.info("** This " + this + " Processing delivery message " + theMsg);
ourLog.info("Delivering {} rest-hook payload {} for {}", theMsg.getOperationType(), thePayloadResource.getIdElement().toUnqualified().getValue(), theSubscription.getIdElement(getContext()).toUnqualifiedVersionless().getValue());
try {

View File

@ -22,15 +22,21 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageHandler;
import java.util.Optional;
public class SubscriptionRestHookInterceptor extends BaseSubscriptionInterceptor {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRestHookInterceptor.class);
@Override
protected Optional<MessageHandler> createDeliveryHandler(CanonicalSubscription theSubscription) {
return Optional.of(new SubscriptionDeliveringRestHookSubscriber(getSubscriptionDao(), getChannelType(), this));
SubscriptionDeliveringRestHookSubscriber value = new SubscriptionDeliveringRestHookSubscriber(getSubscriptionDao(), getChannelType(), this);
// FIXME: remove
ourLog.info("** Creating delivery subscriber " + value + " for " + theSubscription.getIdElementString());
return Optional.of(value);
}
@Override

View File

@ -38,6 +38,7 @@ public class TestR4Config extends BaseJavaConfigR4 {
* starvation
*/
ourMaxThreads = (int) (Math.random() * 6.0) + 1;
ourMaxThreads = 1;
}
private Exception myLastStackTrace;