Merge pull request #1205 from jamesagnew/subscription-interceptor-change

no subs check interceptor calls on DELETE
This commit is contained in:
James Agnew 2019-02-13 22:25:43 -05:00 committed by GitHub
commit 5aadf91a83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 42 additions and 27 deletions

View File

@ -98,9 +98,9 @@ import static org.apache.commons.lang3.StringUtils.*;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -395,11 +395,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
private int doExpungeEverythingQuery(String theQuery) {
StopWatch sw = new StopWatch();
int outcome = myEntityManager.createQuery(theQuery).executeUpdate();
if (outcome > 0) {
ourLog.debug("Query affected {} rows in {}: {}", outcome, sw.toString(), theQuery);
} else {
ourLog.debug("Query affected {} rows in {}: {}", outcome, sw.toString(), theQuery);
}
ourLog.debug("Query affected {} rows in {}: {}", outcome, sw.toString(), theQuery);
return outcome;
}

View File

@ -82,7 +82,7 @@ public class ResourceHistoryTable extends BaseHasResource implements Serializabl
public void addTag(ResourceHistoryTag theTag) {
for (ResourceHistoryTag next : getTags()) {
if (next.getTag().equals(theTag)) {
if (next.equals(theTag)) {
return;
}
}

View File

@ -165,14 +165,14 @@ public class ResourceLink extends BaseResourceIndex {
Validate.isTrue(theTargetResourceUrl.hasBaseUrl());
Validate.isTrue(theTargetResourceUrl.hasResourceType());
if (theTargetResourceUrl.hasIdPart()) {
// if (theTargetResourceUrl.hasIdPart()) {
// do nothing
} else {
// } else {
// Must have set an url like http://example.org/something
// We treat 'something' as the resource type because of fix for #659. Prior to #659 fix, 'something' was
// treated as the id and 'example.org' was treated as the resource type
// TODO: log a warning?
}
// }
myTargetResourceType = theTargetResourceUrl.getResourceType();
myTargetResourceUrl = theTargetResourceUrl.getValue();

View File

@ -53,9 +53,10 @@ public class LinkedBlockingQueueSubscribableChannel implements SubscribableChann
StopWatch sw = new StopWatch();
try {
theQueue.put(theRunnable);
} catch (InterruptedException theE) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString());
" rejected from " + e.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
};

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.jpa.subscription.module.interceptor;
import ca.uhn.fhir.jpa.model.interceptor.api.Hook;
import ca.uhn.fhir.jpa.model.interceptor.api.Interceptor;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.matcher.SubscriptionMatchResult;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage;
@ -118,7 +119,21 @@ public class SubscriptionDebugLogInterceptor {
@Hook(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED)
public void step45_deliveryFailed(ResourceDeliveryMessage theMessage, Exception theFailure) {
log(EventCodeEnum.SUBS6, "Delivery of resource {} for subscription {} to channel of type {} - Failure: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theFailure.toString());
String payloadId = null;
String subscriptionId = null;
CanonicalSubscriptionChannelType channelType = null;
String failureString = null;
if (theMessage != null) {
payloadId = theMessage.getPayloadId();
if (theMessage.getSubscription() != null) {
subscriptionId = theMessage.getSubscription().getIdElementString();
channelType = theMessage.getSubscription().getChannelType();
}
}
if (theFailure != null) {
failureString = theFailure.toString();
}
log(EventCodeEnum.SUBS6, "Delivery of resource {} for subscription {} to channel of type {} - Failure: {}", payloadId, subscriptionId, channelType, failureString);
}
@Hook(Pointcut.SUBSCRIPTION_AFTER_DELIVERY)

View File

@ -97,7 +97,11 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
operation.encoded(thePayloadType);
}
ourLog.info("Delivering {} rest-hook payload {} for {}", theMsg.getOperationType(), thePayloadResource.getIdElement().toUnqualified().getValue(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
String payloadId = null;
if (thePayloadResource != null) {
payloadId = thePayloadResource.getIdElement().toUnqualified().getValue();
}
ourLog.info("Delivering {} rest-hook payload {} for {}", theMsg.getOperationType(), payloadId, theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
try {
operation.execute();

View File

@ -72,6 +72,17 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
}
public void matchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
switch (theMsg.getOperationType()) {
case CREATE:
case UPDATE:
case MANUALLY_TRIGGERED:
break;
case DELETE:
default:
ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
// ignore anything else
return;
}
// Interceptor call: SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED, theMsg)) {
@ -87,18 +98,6 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
}
private void doMatchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
switch (theMsg.getOperationType()) {
case CREATE:
case UPDATE:
case MANUALLY_TRIGGERED:
break;
case DELETE:
default:
ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
// ignore anything else
return;
}
IIdType resourceId = theMsg.getId(myFhirContext);
Collection<ActiveSubscription> subscriptions = mySubscriptionRegistry.getAll();