Honour headers in rest hook subscribers

This commit is contained in:
James 2017-08-15 07:42:18 -04:00
parent c9487c974e
commit f04b080d50
13 changed files with 206 additions and 57 deletions

View File

@ -200,7 +200,8 @@ public class FhirTerser {
BaseRuntimeElementCompositeDefinition<?> currentDef = (BaseRuntimeElementCompositeDefinition<?>) def;
Object currentObj = theTarget;
List<String> parts = Arrays.asList(thePath.split("\\."));
List<String> parts = parsePath(currentDef, thePath);
List<T> retVal = getValues(currentDef, currentObj, parts, theWantedType);
if (retVal.isEmpty()) {
return null;
@ -262,16 +263,23 @@ public class FhirTerser {
public <T> List<T> getValues(IBaseResource theResource, String thePath, Class<T> theWantedClass) {
RuntimeResourceDefinition def = myContext.getResourceDefinition(theResource);
List<String> parts = parsePath(def, thePath);
return getValues(def, theResource, parts, theWantedClass);
}
BaseRuntimeElementCompositeDefinition<?> currentDef = def;
Object currentObj = theResource;
private List<String> parsePath(BaseRuntimeElementCompositeDefinition<?> theElementDef, String thePath) {
List<String> parts = Arrays.asList(thePath.split("\\."));
List<String> subList = parts.subList(1, parts.size());
if (subList.size() < 1) {
if (theElementDef instanceof RuntimeResourceDefinition) {
if (parts.size() > 0 && parts.get(0).equals(theElementDef.getName())) {
parts = parts.subList(1, parts.size());
}
}
if (parts.size() < 1) {
throw new ConfigurationException("Invalid path: " + thePath);
}
return getValues(currentDef, currentObj, subList, theWantedClass);
return parts;
}
/**

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import ca.uhn.fhir.rest.client.api.IClientInterceptor;
import ca.uhn.fhir.rest.client.api.IHttpRequest;
import ca.uhn.fhir.rest.client.api.IHttpResponse;
import org.apache.commons.lang3.Validate;
/**
* This interceptor adds an arbitrary header to requests made by this client. Both the
@ -46,6 +47,9 @@ public class SimpleRequestHeaderInterceptor implements IClientInterceptor {
/**
* Constructor
*
* @param theHeaderName The header name, e.g. "<code>Authorization</code>"
* @param theHeaderValue The header value, e.g. "<code>Bearer 09uer90uw9yh</code>"
*/
public SimpleRequestHeaderInterceptor(String theHeaderName, String theHeaderValue) {
super();
@ -53,6 +57,25 @@ public class SimpleRequestHeaderInterceptor implements IClientInterceptor {
myHeaderValue = theHeaderValue;
}
/**
* Constructor which takes a complete header including name and value
*
* @param theCompleteHeader The complete header, e.g. "<code>Authorization: Bearer af09ufe90efh</code>". Must not be null or empty.
*/
public SimpleRequestHeaderInterceptor(String theCompleteHeader) {
Validate.notBlank(theCompleteHeader, "theCompleteHeader must not be null");
int colonIdx = theCompleteHeader.indexOf(':');
if (colonIdx != -1) {
setHeaderName(theCompleteHeader.substring(0, colonIdx).trim());
setHeaderValue(theCompleteHeader.substring(colonIdx+1, theCompleteHeader.length()).trim());
} else {
setHeaderName(theCompleteHeader.trim());
setHeaderValue(null);
}
}
public String getHeaderName() {
return myHeaderName;
}

View File

@ -0,0 +1,22 @@
package ca.uhn.fhir.rest.client.interceptor;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class SimpleRequestHeaderInterceptorTest {
@Test
public void testParseComnpleteHeader(){
SimpleRequestHeaderInterceptor i = new SimpleRequestHeaderInterceptor("Authorization: Bearer 123");
assertEquals("Authorization", i.getHeaderName());
assertEquals("Bearer 123", i.getHeaderValue());
}
@Test
public void testParseComnpleteHeaderNameOnly(){
SimpleRequestHeaderInterceptor i = new SimpleRequestHeaderInterceptor("Authorization");
assertEquals("Authorization", i.getHeaderName());
assertEquals(null, i.getHeaderValue());
}
}

View File

@ -51,9 +51,12 @@ import java.util.concurrent.*;
public abstract class BaseSubscriptionInterceptor extends ServerOperationInterceptorAdapter {
static final String SUBSCRIPTION_CRITERIA = "criteria";
static final String SUBSCRIPTION_ENDPOINT = "channel.endpoint";
static final String SUBSCRIPTION_PAYLOAD = "channel.payload";
static final String SUBSCRIPTION_STATUS = "Subscription.status";
static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
static final String SUBSCRIPTION_CRITERIA = "Subscription.criteria";
static final String SUBSCRIPTION_ENDPOINT = "Subscription.channel.endpoint";
static final String SUBSCRIPTION_PAYLOAD = "Subscription.channel.payload";
static final String SUBSCRIPTION_HEADER = "Subscription.channel.header";
private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000;
private SubscribableChannel myProcessingChannel;
private ExecutorService myExecutor;
@ -65,12 +68,16 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
private BlockingQueue<Runnable> myExecutorQueue;
public abstract Subscription.SubscriptionChannelType getChannelType();
public BlockingQueue<Runnable> getExecutorQueueForUnitTests() {
return myExecutorQueue;
}
public ConcurrentHashMap<String, IBaseResource> getIdToSubscription() {
return myIdToSubscription;
}
public abstract Subscription.SubscriptionChannelType getChannelType();
public SubscribableChannel getProcessingChannel() {
return myProcessingChannel;
}
@ -117,10 +124,6 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
}
}
public BlockingQueue<Runnable> getExecutorQueueForUnitTests() {
return myExecutorQueue;
}
@PostConstruct
public void postConstruct() {
myExecutorQueue = new LinkedBlockingQueue<>(1000);
@ -161,8 +164,6 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
}
protected abstract void registerDeliverySubscriber();
@SuppressWarnings("unused")
@PreDestroy
public void preDestroy() {
@ -174,7 +175,7 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
unregisterDeliverySubscriber();
}
protected abstract void unregisterDeliverySubscriber();
protected abstract void registerDeliverySubscriber();
@Override
public void resourceCreated(RequestDetails theRequest, IBaseResource theResource) {
@ -213,4 +214,6 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
}
});
}
protected abstract void unregisterDeliverySubscriber();
}

View File

@ -31,8 +31,6 @@ import org.springframework.messaging.SubscribableChannel;
import java.util.concurrent.ConcurrentHashMap;
public abstract class BaseSubscriptionSubscriber implements MessageHandler {
static final String SUBSCRIPTION_STATUS = "status";
static final String SUBSCRIPTION_TYPE = "channel.type";
private final IFhirResourceDao mySubscriptionDao;
private final ConcurrentHashMap<String, IBaseResource> myIdToSubscription;
@ -82,7 +80,7 @@ public abstract class BaseSubscriptionSubscriber implements MessageHandler {
* Does this subscription type (e.g. rest hook, websocket, etc) apply to this interceptor?
*/
protected boolean subscriptionTypeApplies(FhirContext theCtx, IBaseResource theSubscription) {
IPrimitiveType<?> status = theCtx.newTerser().getSingleValueOrNull(theSubscription, SUBSCRIPTION_TYPE, IPrimitiveType.class);
IPrimitiveType<?> status = theCtx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_TYPE, IPrimitiveType.class);
boolean subscriptionTypeApplies = false;
if (getChannelType().toCode().equals(status.getValueAsString())) {
subscriptionTypeApplies = true;

View File

@ -45,6 +45,30 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber
super(theSubscriptionDao, theIdToSubscription, theChannelType, theProcessingChannel);
}
private void activateAndRegisterSubscriptionIfRequired(ResourceModifiedMessage theMsg) {
FhirContext ctx = getSubscriptionDao().getContext();
IBaseResource subscription = theMsg.getNewPayload();
IPrimitiveType<?> status = ctx.newTerser().getSingleValueOrNull(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class);
String statusString = status.getValueAsString();
String requestedStatus = Subscription.SubscriptionStatus.REQUESTED.toCode();
String activeStatus = Subscription.SubscriptionStatus.ACTIVE.toCode();
if (requestedStatus.equals(statusString)) {
status.setValueAsString(activeStatus);
ourLog.info("Activating and registering subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), requestedStatus, activeStatus);
getSubscriptionDao().update(subscription);
getIdToSubscription().put(subscription.getIdElement().getIdPart(), subscription);
} else if (activeStatus.equals(statusString)) {
ourLog.info("Registering active subscription {}", subscription.getIdElement().toUnqualified().getValue());
getIdToSubscription().put(subscription.getIdElement().getIdPart(), subscription);
} else {
if (getIdToSubscription().containsKey(subscription.getIdElement().getIdPart())) {
ourLog.info("Removing {} subscription {}", statusString, subscription.getIdElement().toUnqualified().getValue());
}
getIdToSubscription().remove(subscription.getIdElement().getIdPart());
}
}
private void handleCreate(ResourceModifiedMessage theMsg) {
if (!theMsg.getId().getResourceType().equals("Subscription")) {
return;
@ -55,23 +79,7 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber
return;
}
FhirContext ctx = getSubscriptionDao().getContext();
IBaseResource subscription = theMsg.getNewPayload();
IPrimitiveType<?> status = ctx.newTerser().getSingleValueOrNull(subscription, SUBSCRIPTION_STATUS, IPrimitiveType.class);
String statusString = status.getValueAsString();
String requestedStatus = Subscription.SubscriptionStatus.REQUESTED.toCode();
String activeStatus = Subscription.SubscriptionStatus.ACTIVE.toCode();
if (requestedStatus.equals(statusString)) {
status.setValueAsString(activeStatus);
ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), requestedStatus, activeStatus);
getSubscriptionDao().update(subscription);
getIdToSubscription().put(subscription.getIdElement().getIdPart(), subscription);
} else if (activeStatus.equals(statusString)) {
ourLog.info("Newly created active subscription {}", subscription.getIdElement().toUnqualified().getValue());
getIdToSubscription().put(subscription.getIdElement().getIdPart(), subscription);
}
activateAndRegisterSubscriptionIfRequired(theMsg);
}
@Override
@ -108,15 +116,6 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber
return;
}
FhirContext ctx = getSubscriptionDao().getContext();
IBaseResource subscription = theMsg.getNewPayload();
IPrimitiveType<?> status = ctx.newTerser().getSingleValueOrNull(subscription, SUBSCRIPTION_STATUS, IPrimitiveType.class);
String statusString = status.getValueAsString();
ourLog.info("Subscription {} has status {}", subscription.getIdElement().toUnqualifiedVersionless().getValue(), statusString);
if (Subscription.SubscriptionStatus.ACTIVE.toCode().equals(statusString)) {
getIdToSubscription().put(theMsg.getId().getIdPart(), theMsg.getNewPayload());
}
activateAndRegisterSubscriptionIfRequired(theMsg);
}
}

View File

@ -79,7 +79,7 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
}
// see if the criteria matches the created object
ourLog.info("Checking subscription {} for {} with criteria {}", nextSubscriptionId, resourceType, nextCriteriaString);
ourLog.debug("Checking subscription {} for {} with criteria {}", nextSubscriptionId, resourceType, nextCriteriaString);
String criteriaResource = nextCriteriaString;
int index = criteriaResource.indexOf("?");
@ -88,7 +88,7 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
}
if (resourceType != null && nextCriteriaString != null && !criteriaResource.equals(resourceType)) {
ourLog.info("Skipping subscription search for {} because it does not match the criteria {}", resourceType, nextCriteriaString);
ourLog.debug("Skipping subscription search for {} because it does not match the criteria {}", resourceType, nextCriteriaString);
continue;
}

View File

@ -20,12 +20,12 @@ package ca.uhn.fhir.jpa.subscription;
* #L%
*/
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.ServerValidationModeEnum;
import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor;
import ca.uhn.fhir.rest.gclient.IClientExecutable;
import org.apache.commons.lang3.ObjectUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -37,8 +37,11 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionSubscriber {
private Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringRestHookSubscriber.class);
@ -61,11 +64,12 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionSu
RestOperationTypeEnum operationType = msg.getOperationType();
IBaseResource subscription = msg.getSubscription();
// Grab the endpoint from the subscription
IPrimitiveType<?> endpoint = getContext().newTerser().getSingleValueOrNull(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_ENDPOINT, IPrimitiveType.class);
String endpointUrl = endpoint.getValueAsString();
// Grab the payload type (encoding mimetype ) from the subscription
// Grab the payload type (encoding mimetype) from the subscription
IPrimitiveType<?> payload = getContext().newTerser().getSingleValueOrNull(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_PAYLOAD, IPrimitiveType.class);
String payloadString = payload.getValueAsString();
if (payloadString.contains(";")) {
@ -75,9 +79,18 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionSu
EncodingEnum payloadType = EncodingEnum.forContentType(payloadString);
payloadType = ObjectUtils.defaultIfNull(payloadType, EncodingEnum.XML);
// Create the client request
getContext().getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER);
IGenericClient client = getContext().newRestfulGenericClient(endpointUrl);
// Additional headers specified in the subscription
List<IPrimitiveType> headers = getContext().newTerser().getValues(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_HEADER, IPrimitiveType.class);
for (IPrimitiveType next : headers) {
if (isNotBlank(next.getValueAsString())) {
client.registerInterceptor(new SimpleRequestHeaderInterceptor(next.getValueAsString()));
}
}
IBaseResource payloadResource = msg.getPayoad();
IClientExecutable<?, ?> operation;

View File

@ -24,8 +24,11 @@ import org.junit.*;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@ -42,6 +45,7 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
private static String ourListenerServerBase;
private static List<Observation> ourUpdatedObservations = Lists.newArrayList();
private static List<String> ourContentTypes = new ArrayList<>();
private static List<String> ourHeaders = new ArrayList<>();
private List<IIdType> mySubscriptionIds = new ArrayList<>();
@After
@ -71,6 +75,7 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
ourCreatedObservations.clear();
ourUpdatedObservations.clear();
ourContentTypes.clear();
ourHeaders.clear();
}
private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {
@ -79,11 +84,10 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
subscription.setCriteria(theCriteria);
Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent();
Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
channel.setType(Subscription.SubscriptionChannelType.RESTHOOK);
channel.setPayload(thePayload);
channel.setEndpoint(theEndpoint);
subscription.setChannel(channel);
MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute();
subscription.setId(methodOutcome.getId().getIdPart());
@ -315,6 +319,62 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
}
}
@Test
public void testSubscriptionWithHeaders() throws Exception {
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
// Add some headers, and we'll also turn back to requested status for fun
Subscription subscription = createSubscription(criteria1, payload, ourListenerServerBase);
subscription.getChannel().addHeader("X-Foo: FOO");
subscription.getChannel().addHeader("X-Bar: BAR");
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
ourClient.update().resource(subscription).execute();
waitForQueueToDrain();
sendObservation(code, "SNOMED-CT");
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(1, ourCreatedObservations);
waitForSize(0, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertThat(ourHeaders, hasItem("X-Foo: FOO"));
assertThat(ourHeaders, hasItem("X-Bar: BAR"));
}
@Test
public void testDisableSubscription() throws Exception {
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
Subscription subscription = createSubscription(criteria1, payload, ourListenerServerBase);
sendObservation(code, "SNOMED-CT");
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(1, ourCreatedObservations);
waitForSize(0, ourUpdatedObservations);
// Disable
subscription.setStatus(Subscription.SubscriptionStatus.OFF);
ourClient.update().resource(subscription).execute();
waitForQueueToDrain();
// Send another object
sendObservation(code, "SNOMED-CT");
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(1, ourCreatedObservations);
waitForSize(0, ourUpdatedObservations);
}
private void waitForQueueToDrain() throws InterruptedException {
RestHookTestDstu2Test.waitForQueueToDrain(ourRestHookSubscriptionInterceptor);
}
@ -353,9 +413,22 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
ourLog.info("Received Listener Create");
ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
ourCreatedObservations.add(theObservation);
extractHeaders(theRequest);
return new MethodOutcome(new IdType("Observation/1"), true);
}
private void extractHeaders(HttpServletRequest theRequest) {
Enumeration<String> headerNamesEnum = theRequest.getHeaderNames();
while (headerNamesEnum.hasMoreElements()) {
String nextName = headerNamesEnum.nextElement();
Enumeration<String> valueEnum = theRequest.getHeaders(nextName);
while (valueEnum.hasMoreElements()) {
String nextValue = valueEnum.nextElement();
ourHeaders.add(nextName + ": " + nextValue);
}
}
}
@Override
public Class<? extends IBaseResource> getResourceType() {
return Observation.class;
@ -366,6 +439,7 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
ourLog.info("Received Listener Update");
ourUpdatedObservations.add(theObservation);
ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
extractHeaders(theRequest);
return new MethodOutcome(new IdType("Observation/1"), false);
}

View File

@ -1,5 +1,7 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>TRACE</level>

View File

@ -1216,7 +1216,7 @@ public class ClientR4Test {
TestUtil.clearAllStaticFieldsForUnitTest();
}
static String getPatientFeedWithOneResult() {
private static String getPatientFeedWithOneResult() {
return getPatientFeedWithOneResult(ourCtx);
}

View File

@ -111,7 +111,7 @@ public class GenericClientR4Test {
@Test
public void testSearchWithNoExplicitBundleReturnType() throws Exception {
String msg = ClientR4Test.getPatientFeedWithOneResult();
String msg = ClientR4Test.getPatientFeedWithOneResult(ourCtx);
ArgumentCaptor<HttpUriRequest> capt = ArgumentCaptor.forClass(HttpUriRequest.class);

View File

@ -311,6 +311,13 @@
client for some versions of FHIR. Thanks to Clayton Bodendein for the
pull request!
</action>
<action type="add">
Add a new constructor to SimpleRequestHeaderInterceptor which allows a complete header
to be passed in (including name and value in one string)
</action>
<action type="add">
REST Hook subscriptions now honour the Subscription.channel.header field
</action>
</release>
<release version="2.5" date="2017-06-08">
<action type="fix">