Allow subscription triggering on a block of resources

This commit is contained in:
James Agnew 2018-10-16 15:29:41 -04:00
parent a0333a4767
commit 755060f91e
10 changed files with 599 additions and 49 deletions

View File

@ -442,8 +442,6 @@ public class FhirContext {
* *
* @throws DataFormatException If the resource name is not known * @throws DataFormatException If the resource name is not known
*/ */
// Multiple spots in HAPI FHIR and Smile CDR depend on DataFormatException being
// thrown by this method, don't change that.
public RuntimeResourceDefinition getResourceDefinition(String theResourceName) throws DataFormatException { public RuntimeResourceDefinition getResourceDefinition(String theResourceName) throws DataFormatException {
validateInitialized(); validateInitialized();
Validate.notBlank(theResourceName, "theResourceName must not be blank"); Validate.notBlank(theResourceName, "theResourceName must not be blank");
@ -454,6 +452,10 @@ public class FhirContext {
if (retVal == null) { if (retVal == null) {
Class<? extends IBaseResource> clazz = myNameToResourceType.get(resourceName.toLowerCase()); Class<? extends IBaseResource> clazz = myNameToResourceType.get(resourceName.toLowerCase());
if (clazz == null) { if (clazz == null) {
// ***********************************************************************
// Multiple spots in HAPI FHIR and Smile CDR depend on DataFormatException
// being thrown by this method, don't change that.
// ***********************************************************************
throw new DataFormatException(createUnknownResourceNameError(theResourceName, myVersion.getVersion())); throw new DataFormatException(createUnknownResourceNameError(theResourceName, myVersion.getVersion()));
} }
if (IBaseResource.class.isAssignableFrom(clazz)) { if (IBaseResource.class.isAssignableFrom(clazz)) {

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.util;
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -22,6 +22,7 @@ package ca.uhn.fhir.util;
import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.instance.model.api.IPrimitiveType;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -35,7 +36,7 @@ public class DatatypeUtil {
HashSet<String> retVal = new HashSet<>(); HashSet<String> retVal = new HashSet<>();
if (theStringList != null) { if (theStringList != null) {
for (IPrimitiveType<?> string : theStringList) { for (IPrimitiveType<?> string : theStringList) {
if (string != null && string.getValue()!=null) { if (string != null && string.getValue() != null) {
retVal.add(string.getValueAsString()); retVal.add(string.getValueAsString());
} }
} }
@ -44,7 +45,7 @@ public class DatatypeUtil {
} }
/** /**
* Joins a list of strings with a single space (' ') between each string * Joins a list of strings with a single space (' ') between each string
*/ */
public static String joinStringsSpaceSeparated(List<? extends IPrimitiveType<String>> theStrings) { public static String joinStringsSpaceSeparated(List<? extends IPrimitiveType<String>> theStrings) {
StringBuilder b = new StringBuilder(); StringBuilder b = new StringBuilder();

View File

@ -832,6 +832,25 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
return processMatchUrl(theMatchUrl, getResourceType()); return processMatchUrl(theMatchUrl, getResourceType());
} }
@Override
public IBaseResource readByPid(Long thePid) {
StopWatch w = new StopWatch();
Optional<ResourceTable> entity = myResourceTableDao.findById(thePid);
if (!entity.isPresent()) {
throw new ResourceNotFoundException("No resource found with PID " + thePid);
}
if (entity.get().getDeleted() != null) {
throw new ResourceGoneException("Resource was deleted at " + new InstantType(entity.get().getDeleted()).getValueAsString());
}
T retVal = toResource(myResourceType, entity.get(), null, false);
ourLog.debug("Processed read on {} in {}ms", thePid, w.getMillis());
return retVal;
}
@Override @Override
public T read(IIdType theId) { public T read(IIdType theId) {
return read(theId, null); return read(theId, null);

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.dao;
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -158,6 +158,11 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
*/ */
T read(IIdType theId); T read(IIdType theId);
/**
* Read a resource by its internal PID
*/
IBaseResource readByPid(Long thePid);
/** /**
* @param theId * @param theId
* @param theRequestDetails TODO * @param theRequestDetails TODO
@ -239,6 +244,7 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
RuntimeResourceDefinition validateCriteriaAndReturnResourceDefinition(String criteria); RuntimeResourceDefinition validateCriteriaAndReturnResourceDefinition(String criteria);
// /** // /**
// * Invoke the everything operation // * Invoke the everything operation
// */ // */

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.provider;
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -21,64 +21,163 @@ package ca.uhn.fhir.jpa.provider;
*/ */
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.util.JpaConstants; import ca.uhn.fhir.jpa.util.JpaConstants;
import ca.uhn.fhir.rest.annotation.IdParam; import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.annotation.Operation; import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam; import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.param.UriParam; import ca.uhn.fhir.rest.param.UriParam;
import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.util.ParametersUtil; import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.ValidateUtil; import ca.uhn.fhir.util.ValidateUtil;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.IdType; import org.hl7.fhir.instance.model.IdType;
import org.hl7.fhir.instance.model.api.IBaseParameters; import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.List; import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class SubscriptionTriggeringProvider implements IResourceProvider { import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class SubscriptionTriggeringProvider implements IResourceProvider, ApplicationContextAware {
public static final String RESOURCE_ID = "resourceId"; public static final String RESOURCE_ID = "resourceId";
public static final int DEFAULT_MAX_SUBMIT = 10000;
public static final String SEARCH_URL = "searchUrl";
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class);
private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>();
@Autowired @Autowired
private FhirContext myFhirContext; private FhirContext myFhirContext;
@Autowired @Autowired
private IFhirSystemDao<?,?> mySystemDao; private DaoRegistry myDaoRegistry;
@Autowired(required = false)
private List<BaseSubscriptionInterceptor<?>> mySubscriptionInterceptorList; private List<BaseSubscriptionInterceptor<?>> mySubscriptionInterceptorList;
private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT;
@Autowired
private ISearchCoordinatorSvc mySearchCoordinatorSvc;
private ApplicationContext myAppCtx;
@Operation(name= JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) /**
* Sets the maximum number of resources that will be submitted in a single pass
*/
public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) {
Integer maxSubmitPerPass = theMaxSubmitPerPass;
if (maxSubmitPerPass == null) {
maxSubmitPerPass = DEFAULT_MAX_SUBMIT;
}
Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0");
myMaxSubmitPerPass = maxSubmitPerPass;
}
@SuppressWarnings("unchecked")
@PostConstruct
public void start() {
mySubscriptionInterceptorList = ObjectUtils.defaultIfNull(mySubscriptionInterceptorList, Collections.emptyList());
mySubscriptionInterceptorList = new ArrayList<>();
Collection values1 = myAppCtx.getBeansOfType(BaseSubscriptionInterceptor.class).values();
Collection<BaseSubscriptionInterceptor<?>> values = (Collection<BaseSubscriptionInterceptor<?>>) values1;
mySubscriptionInterceptorList.addAll(values);
}
@Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
public IBaseParameters triggerSubscription(
@OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds,
@OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls
) {
return doTriggerSubscription(theResourceIds, theSearchUrls, null);
}
@Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
public IBaseParameters triggerSubscription( public IBaseParameters triggerSubscription(
@IdParam IIdType theSubscriptionId, @IdParam IIdType theSubscriptionId,
@OperationParam(name= RESOURCE_ID) UriParam theResourceId) { @OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds,
@OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls
) {
ValidateUtil.isTrueOrThrowInvalidRequest(theResourceId != null, RESOURCE_ID + " parameter not provided"); // Throw a 404 if the subscription doesn't exist
IdType resourceId = new IdType(theResourceId.getValue()); IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getResourceDao("Subscription");
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type"); IIdType subscriptionId = theSubscriptionId;
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part"); if (subscriptionId.hasResourceType() == false) {
subscriptionId = subscriptionId.withResourceType("Subscription");
}
subscriptionDao.read(subscriptionId);
Class<? extends IBaseResource> resourceType = myFhirContext.getResourceDefinition(resourceId.getResourceType()).getImplementingClass(); return doTriggerSubscription(theResourceIds, theSearchUrls, subscriptionId);
IFhirResourceDao<? extends IBaseResource> dao = mySystemDao.getDao(resourceType);
IBaseResource resourceToTrigger = dao.read(resourceId);
ResourceModifiedMessage msg = new ResourceModifiedMessage(); }
msg.setId(resourceToTrigger.getIdElement());
msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE);
msg.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue());
msg.setNewPayload(myFhirContext, resourceToTrigger);
for (BaseSubscriptionInterceptor<?> next :mySubscriptionInterceptorList) { private IBaseParameters doTriggerSubscription(@OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds, @OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls, @IdParam IIdType theSubscriptionId) {
next.submitResourceModified(msg); if (mySubscriptionInterceptorList.isEmpty()) {
throw new PreconditionFailedException("Subscription processing not active on this server");
} }
List<UriParam> resourceIds = ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList());
List<StringParam> searchUrls = ObjectUtils.defaultIfNull(theSearchUrls, Collections.emptyList());
// Make sure we have at least one resource ID or search URL
if (resourceIds.size() == 0 && searchUrls.size() == 0) {
throw new InvalidRequestException("No resource IDs or search URLs specified for triggering");
}
// Resource URLs must be compete
for (UriParam next : resourceIds) {
IdType resourceId = new IdType(next.getValue());
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type");
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part");
}
// Search URLs must be valid
for (StringParam next : searchUrls) {
if (next.getValue().contains("?") == false) {
throw new InvalidRequestException("Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
}
}
SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails();
jobDetails.setJobId(UUID.randomUUID().toString());
jobDetails.setRemainingResourceIds(resourceIds.stream().map(UriParam::getValue).collect(Collectors.toList()));
jobDetails.setRemainingSearchUrls(searchUrls.stream().map(StringParam::getValue).collect(Collectors.toList()));
if (theSubscriptionId != null) {
jobDetails.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue());
}
// Submit job for processing
synchronized (myActiveJobs) {
myActiveJobs.add(jobDetails);
}
// Create a parameters response
IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext); IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext);
IPrimitiveType<?> value = (IPrimitiveType<?>) myFhirContext.getElementDefinition("string").newInstance(); IPrimitiveType<?> value = (IPrimitiveType<?>) myFhirContext.getElementDefinition("string").newInstance();
value.setValueAsString("Triggered resource " + theResourceId.getValue() + " for subscription"); value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId);
ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value); ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value);
return retVal; return retVal;
} }
@ -87,4 +186,210 @@ public class SubscriptionTriggeringProvider implements IResourceProvider {
public Class<? extends IBaseResource> getResourceType() { public Class<? extends IBaseResource> getResourceType() {
return myFhirContext.getResourceDefinition("Subscription").getImplementingClass(); return myFhirContext.getResourceDefinition("Subscription").getImplementingClass();
} }
@Scheduled(fixedDelay = DateUtils.MILLIS_PER_SECOND)
public void runDeliveryPass() {
synchronized (myActiveJobs) {
if (myActiveJobs.isEmpty()) {
return;
}
SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);
runJob(activeJob);
// If the job is complete, remove it from the queue
if (activeJob.getRemainingResourceIds().isEmpty()) {
if (activeJob.getRemainingSearchUrls().isEmpty()) {
if (isBlank(activeJob.myCurrentSearchUuid)) {
myActiveJobs.remove(0);
String remainingJobsMsg = "";
if (myActiveJobs.size() > 0) {
remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)";
}
ourLog.info("Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg);
}
}
}
}
}
private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
StopWatch sw = new StopWatch();
// Submit individual resources
int totalSubmitted = 0;
while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
totalSubmitted++;
String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
}
// If we don't have an active search started, and one needs to be.. start it
if (isBlank(theJobDetails.getCurrentSearchUuid()) && theJobDetails.getRemainingSearchUrls().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0);
RuntimeResourceDefinition resourceDef = CacheWarmingSvcImpl.parseUrlResourceType(myFhirContext, nextSearchUrl);
String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?'));
String resourceType = resourceDef.getName();
IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType);
SearchParameterMap params = BaseHapiFhirDao.translateMatchUrl(callingDao, myFhirContext, queryPart, resourceDef);
ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl);
IBundleProvider search = mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective());
theJobDetails.setCurrentSearchUuid(search.getUuid());
theJobDetails.setCurrentSearchResourceType(resourceType);
theJobDetails.setCurrentSearchCount(params.getCount());
}
// If we have an active search going, submit resources from it
if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) {
int fromIndex = 0;
if (theJobDetails.getCurrentSearchLastUploadedIndex() != null) {
fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
}
IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
int maxQuerySize = myMaxSubmitPerPass - totalSubmitted;
int toIndex = fromIndex + maxQuerySize;
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
}
ourLog.info("Triggering job[{}] submitting up to {} resources for search {}", theJobDetails.getJobId(), maxQuerySize, theJobDetails.getCurrentSearchUuid());
List<Long> resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
for (Long next : resourceIds) {
IBaseResource nextResource = resourceDao.readByPid(next);
submitResource(theJobDetails.getSubscriptionId(), nextResource);
totalSubmitted++;
theJobDetails.setCurrentSearchLastUploadedIndex(toIndex - 1);
}
int expectedCount = toIndex - fromIndex;
if (resourceIds.size() < expectedCount || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
ourLog.info("Triggering job[{}] search {} has completed", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
theJobDetails.setCurrentSearchResourceType(null);
theJobDetails.setCurrentSearchUuid(null);
theJobDetails.setCurrentSearchLastUploadedIndex(null);
theJobDetails.setCurrentSearchCount(null);
}
}
ourLog.info("Subscription trigger job[{}] triggered {} resources in {} ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
}
private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
IFhirResourceDao<? extends IBaseResource> dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
IBaseResource resourceToTrigger = dao.read(resourceId);
submitResource(theSubscriptionId, resourceToTrigger);
}
private void submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId);
ResourceModifiedMessage msg = new ResourceModifiedMessage();
msg.setId(theResourceToTrigger.getIdElement());
msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE);
msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue());
msg.setNewPayload(myFhirContext, theResourceToTrigger);
for (BaseSubscriptionInterceptor<?> next : mySubscriptionInterceptorList) {
next.submitResourceModified(msg);
}
}
public void cancelAll() {
synchronized (myActiveJobs) {
myActiveJobs.clear();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
myAppCtx = applicationContext;
}
private static class SubscriptionTriggeringJobDetails {
private String myJobId;
private String mySubscriptionId;
private List<String> myRemainingResourceIds;
private List<String> myRemainingSearchUrls;
private String myCurrentSearchUuid;
private Integer myCurrentSearchCount;
private String myCurrentSearchResourceType;
private Integer myCurrentSearchLastUploadedIndex;
public Integer getCurrentSearchCount() {
return myCurrentSearchCount;
}
public void setCurrentSearchCount(Integer theCurrentSearchCount) {
myCurrentSearchCount = theCurrentSearchCount;
}
public String getCurrentSearchResourceType() {
return myCurrentSearchResourceType;
}
public void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
myCurrentSearchResourceType = theCurrentSearchResourceType;
}
public String getJobId() {
return myJobId;
}
public void setJobId(String theJobId) {
myJobId = theJobId;
}
public String getSubscriptionId() {
return mySubscriptionId;
}
public void setSubscriptionId(String theSubscriptionId) {
mySubscriptionId = theSubscriptionId;
}
public List<String> getRemainingResourceIds() {
return myRemainingResourceIds;
}
public void setRemainingResourceIds(List<String> theRemainingResourceIds) {
myRemainingResourceIds = theRemainingResourceIds;
}
public List<String> getRemainingSearchUrls() {
return myRemainingSearchUrls;
}
public void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
myRemainingSearchUrls = theRemainingSearchUrls;
}
public String getCurrentSearchUuid() {
return myCurrentSearchUuid;
}
public void setCurrentSearchUuid(String theCurrentSearchUuid) {
myCurrentSearchUuid = theCurrentSearchUuid;
}
public Integer getCurrentSearchLastUploadedIndex() {
return myCurrentSearchLastUploadedIndex;
}
public void setCurrentSearchLastUploadedIndex(Integer theCurrentSearchLastUploadedIndex) {
myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
}
}
} }

View File

@ -117,7 +117,7 @@ public class PersistedJpaBundleProvider implements IBundleProvider {
results = query.getResultList(); results = query.getResultList();
ArrayList<IBaseResource> retVal = new ArrayList<IBaseResource>(); ArrayList<IBaseResource> retVal = new ArrayList<>();
for (ResourceHistoryTable next : results) { for (ResourceHistoryTable next : results) {
BaseHasResource resource; BaseHasResource resource;
resource = next; resource = next;

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.dao.*; import ca.uhn.fhir.jpa.dao.*;
import ca.uhn.fhir.parser.DataFormatException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
@ -68,7 +69,7 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
private void refreshNow(WarmCacheEntry theCacheEntry) { private void refreshNow(WarmCacheEntry theCacheEntry) {
String nextUrl = theCacheEntry.getUrl(); String nextUrl = theCacheEntry.getUrl();
RuntimeResourceDefinition resourceDef = parseWarmUrlResourceType(nextUrl); RuntimeResourceDefinition resourceDef = parseUrlResourceType(myCtx, nextUrl);
IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceDef.getName()); IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceDef.getName());
String queryPart = parseWarmUrlParamPart(nextUrl); String queryPart = parseWarmUrlParamPart(nextUrl);
SearchParameterMap responseCriteriaUrl = BaseHapiFhirDao.translateMatchUrl(callingDao, myCtx, queryPart, resourceDef); SearchParameterMap responseCriteriaUrl = BaseHapiFhirDao.translateMatchUrl(callingDao, myCtx, queryPart, resourceDef);
@ -84,14 +85,18 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
return theNextUrl.substring(paramIndex); return theNextUrl.substring(paramIndex);
} }
private RuntimeResourceDefinition parseWarmUrlResourceType(String theNextUrl) { /**
int paramIndex = theNextUrl.indexOf('?'); * TODO: this method probably belongs in a utility class, not here
String resourceName = theNextUrl.substring(0, paramIndex); *
* @throws DataFormatException If the resource type is not known
*/
public static RuntimeResourceDefinition parseUrlResourceType(FhirContext theCtx, String theUrl) throws DataFormatException {
int paramIndex = theUrl.indexOf('?');
String resourceName = theUrl.substring(0, paramIndex);
if (resourceName.contains("/")) { if (resourceName.contains("/")) {
resourceName = resourceName.substring(resourceName.lastIndexOf('/') + 1); resourceName = resourceName.substring(resourceName.lastIndexOf('/') + 1);
} }
RuntimeResourceDefinition resourceDef = myCtx.getResourceDefinition(resourceName); return theCtx.getResourceDefinition(resourceName);
return resourceDef;
} }
@PostConstruct @PostConstruct
@ -107,7 +112,7 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
// Validate // Validate
parseWarmUrlParamPart(next.getUrl()); parseWarmUrlParamPart(next.getUrl());
parseWarmUrlResourceType(next.getUrl()); parseUrlResourceType(myCtx, next.getUrl());
myCacheEntryToNextRefresh.put(next, 0L); myCacheEntryToNextRefresh.put(next, 0L);
} }

View File

@ -401,14 +401,14 @@ public abstract class BaseJpaTest {
public static void waitForSize(int theTarget, List<?> theList) { public static void waitForSize(int theTarget, List<?> theList) {
StopWatch sw = new StopWatch(); StopWatch sw = new StopWatch();
while (theList.size() != theTarget && sw.getMillis() <= 15000) { while (theList.size() != theTarget && sw.getMillis() <= 16000) {
try { try {
Thread.sleep(50); Thread.sleep(50);
} catch (InterruptedException theE) { } catch (InterruptedException theE) {
throw new Error(theE); throw new Error(theE);
} }
} }
if (sw.getMillis() >= 15000) { if (sw.getMillis() >= 16000) {
String describeResults = theList String describeResults = theList
.stream() .stream()
.map(t -> { .map(t -> {

View File

@ -66,6 +66,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test {
protected static ISearchCoordinatorSvc mySearchCoordinatorSvc; protected static ISearchCoordinatorSvc mySearchCoordinatorSvc;
private static Server ourServer; private static Server ourServer;
private TerminologyUploaderProviderDstu3 myTerminologyUploaderProvider; private TerminologyUploaderProviderDstu3 myTerminologyUploaderProvider;
protected static SubscriptionTriggeringProvider ourSubscriptionTriggeringProvider;
public BaseResourceProviderDstu3Test() { public BaseResourceProviderDstu3Test() {
super(); super();
@ -160,6 +161,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test {
ourRestHookSubscriptionInterceptor = wac.getBean(SubscriptionRestHookInterceptor.class); ourRestHookSubscriptionInterceptor = wac.getBean(SubscriptionRestHookInterceptor.class);
ourEmailSubscriptionInterceptor = wac.getBean(SubscriptionEmailInterceptor.class); ourEmailSubscriptionInterceptor = wac.getBean(SubscriptionEmailInterceptor.class);
ourSearchParamRegistry = wac.getBean(SearchParamRegistryDstu3.class); ourSearchParamRegistry = wac.getBean(SearchParamRegistryDstu3.class);
ourSubscriptionTriggeringProvider = wac.getBean(SubscriptionTriggeringProvider.class);
myFhirCtx.getRestfulClientFactory().setSocketTimeout(5000000); myFhirCtx.getRestfulClientFactory().setSocketTimeout(5000000);
ourClient = myFhirCtx.newRestfulGenericClient(ourServerBase); ourClient = myFhirCtx.newRestfulGenericClient(ourServerBase);

View File

@ -13,6 +13,7 @@ import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor; import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor;
import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.RestfulServer; import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.util.PortUtil; import ca.uhn.fhir.util.PortUtil;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -27,7 +28,10 @@ import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/** /**
* Test the rest-hook subscriptions * Test the rest-hook subscriptions
@ -36,12 +40,14 @@ import static org.junit.Assert.assertEquals;
public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Test { public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionTriggeringDstu3Test.class); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionTriggeringDstu3Test.class);
private static List<Observation> ourCreatedObservations = Lists.newArrayList();
private static int ourListenerPort; private static int ourListenerPort;
private static RestfulServer ourListenerRestServer; private static RestfulServer ourListenerRestServer;
private static Server ourListenerServer; private static Server ourListenerServer;
private static String ourListenerServerBase; private static String ourListenerServerBase;
private static List<Observation> ourCreatedObservations = Lists.newArrayList();
private static List<Observation> ourUpdatedObservations = Lists.newArrayList(); private static List<Observation> ourUpdatedObservations = Lists.newArrayList();
private static List<Patient> ourCreatedPatients = Lists.newArrayList();
private static List<Patient> ourUpdatedPatients = Lists.newArrayList();
private static List<String> ourContentTypes = new ArrayList<>(); private static List<String> ourContentTypes = new ArrayList<>();
private List<IIdType> mySubscriptionIds = new ArrayList<>(); private List<IIdType> mySubscriptionIds = new ArrayList<>();
@ -63,6 +69,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
ourRestServer.unregisterInterceptor(ourRestHookSubscriptionInterceptor); ourRestServer.unregisterInterceptor(ourRestHookSubscriptionInterceptor);
ourSubscriptionTriggeringProvider.cancelAll();
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(null);
} }
@Before @Before
@ -70,10 +78,15 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
ourRestServer.registerInterceptor(ourRestHookSubscriptionInterceptor); ourRestServer.registerInterceptor(ourRestHookSubscriptionInterceptor);
} }
/**
* Only do counter resets here! We call this inside tests
*/
@Before @Before
public void beforeReset() { public void beforeReset() {
ourCreatedObservations.clear(); ourCreatedObservations.clear();
ourUpdatedObservations.clear(); ourUpdatedObservations.clear();
ourCreatedPatients.clear();
ourUpdatedPatients.clear();
ourContentTypes.clear(); ourContentTypes.clear();
} }
@ -90,7 +103,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
subscription.setChannel(channel); subscription.setChannel(channel);
MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute(); MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute();
subscription.setId(methodOutcome.getId().getIdPart()); subscription.setId(methodOutcome.getId());
mySubscriptionIds.add(methodOutcome.getId()); mySubscriptionIds.add(methodOutcome.getId());
waitForQueueToDrain(); waitForQueueToDrain();
@ -116,6 +129,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
return observation; return observation;
} }
@Test @Test
public void testTriggerResourceToSpecificSubscription() throws Exception { public void testTriggerResourceToSpecificSubscription() throws Exception {
String payload = "application/fhir+json"; String payload = "application/fhir+json";
@ -145,7 +159,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
.execute(); .execute();
String responseValue = response.getParameter().get(0).getValue().primitiveValue(); String responseValue = response.getParameter().get(0).getValue().primitiveValue();
assertEquals("Triggered resource " + obsId.getValue() + " for subscription", responseValue); assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
waitForQueueToDrain(); waitForQueueToDrain();
waitForSize(0, ourCreatedObservations); waitForSize(0, ourCreatedObservations);
@ -153,6 +167,173 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
} }
@Test
public void testTriggerUsingMultipleSearches() throws Exception {
String payload = "application/fhir+json";
IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement();
IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement();
// Create lots
for (int i = 0; i < 50; i++) {
Patient p = new Patient();
p.addName().setFamily("P" + i);
ourClient.create().resource(p).execute();
}
for (int i = 0; i < 50; i++) {
Observation o = new Observation();
o.setId("O" + i);
o.setStatus(Observation.ObservationStatus.FINAL);
o.getCode().setText("O" + i);
ourClient.update().resource(o).execute();
}
waitForSize(50, ourUpdatedObservations);
waitForSize(0, ourCreatedObservations);
waitForSize(0, ourCreatedPatients);
waitForSize(50, ourUpdatedPatients);
beforeReset();
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33);
ourClient.registerInterceptor(new LoggingInterceptor(true));
Parameters response = ourClient
.operation()
.onInstance(sub1id)
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
.withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation?"))
.andParameter(SubscriptionTriggeringProvider.RESOURCE_ID, new UriType("Observation/O2"))
.execute();
String responseValue = response.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
response = ourClient
.operation()
.onInstance(sub2id)
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
.withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Patient?"))
.execute();
responseValue = response.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
waitForSize(51, ourUpdatedObservations);
waitForSize(0, ourCreatedObservations);
waitForSize(0, ourCreatedPatients);
waitForSize(50, ourUpdatedPatients);
}
@Test
public void testTriggerUsingSearchesWithCount() throws Exception {
String payload = "application/fhir+json";
IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement();
IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement();
// Create lots
for (int i = 0; i < 50; i++) {
Patient p = new Patient();
p.addName().setFamily("P" + i);
ourClient.create().resource(p).execute();
}
for (int i = 0; i < 50; i++) {
Observation o = new Observation();
o.setId("O" + i);
o.setStatus(Observation.ObservationStatus.FINAL);
o.getCode().setText("O" + i);
ourClient.update().resource(o).execute();
}
waitForSize(50, ourUpdatedObservations);
waitForSize(0, ourCreatedObservations);
waitForSize(0, ourCreatedPatients);
waitForSize(50, ourUpdatedPatients);
beforeReset();
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33);
ourClient.registerInterceptor(new LoggingInterceptor(true));
Parameters response = ourClient
.operation()
.onInstance(sub1id)
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
.withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation?_count=10"))
.execute();
String responseValue = response.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
response = ourClient
.operation()
.onInstance(sub2id)
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
.withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Patient?_count=16"))
.execute();
responseValue = response.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
waitForSize(10, ourUpdatedObservations);
waitForSize(0, ourCreatedObservations);
waitForSize(0, ourCreatedPatients);
waitForSize(16, ourUpdatedPatients);
}
@Test
public void testTriggerUsingInvalidSearchUrl() {
try {
ourClient
.operation()
.onType(Subscription.class)
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
.withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation"))
.execute();
fail();
} catch (InvalidRequestException e) {
assertEquals("HTTP 400 Bad Request: Search URL is not valid (must be in the form \"[resource type]?[optional params]\")", e.getMessage());
}
}
@Test
public void testTriggerAllSubscriptions() throws Exception {
String payload = "application/fhir+json";
IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement();
IdType sub2id = createSubscription("Observation?status=final", payload, ourListenerServerBase).getIdElement();
for (int i = 0; i < 50; i++) {
Observation o = new Observation();
o.setId("O" + i);
o.setStatus(Observation.ObservationStatus.FINAL);
o.getCode().setText("O" + i);
ourClient.update().resource(o).execute();
}
waitForSize(100, ourUpdatedObservations);
waitForSize(0, ourCreatedObservations);
waitForSize(0, ourCreatedPatients);
waitForSize(0, ourUpdatedPatients);
beforeReset();
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33);
ourClient.registerInterceptor(new LoggingInterceptor(true));
Parameters response = ourClient
.operation()
.onType(Subscription.class)
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
.withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation?"))
.execute();
String responseValue = response.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
waitForSize(100, ourUpdatedObservations);
waitForSize(0, ourCreatedObservations);
waitForSize(0, ourCreatedPatients);
waitForSize(0, ourUpdatedPatients);
}
@Test @Test
public void testTriggerResourceToSpecificSubscriptionWhichDoesntMatch() throws Exception { public void testTriggerResourceToSpecificSubscriptionWhichDoesntMatch() throws Exception {
String payload = "application/fhir+json"; String payload = "application/fhir+json";
@ -172,8 +353,6 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
waitForSize(1, ourUpdatedObservations); waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
ourClient.registerInterceptor(new LoggingInterceptor(true));
Parameters response = ourClient Parameters response = ourClient
.operation() .operation()
.onInstance(subscriptionId) .onInstance(subscriptionId)
@ -182,7 +361,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
.execute(); .execute();
String responseValue = response.getParameter().get(0).getValue().primitiveValue(); String responseValue = response.getParameter().get(0).getValue().primitiveValue();
assertEquals("Triggered resource " + obsId.getValue() + " for subscription", responseValue); assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
waitForQueueToDrain(); waitForQueueToDrain();
waitForSize(0, ourCreatedObservations); waitForSize(0, ourCreatedObservations);
@ -191,6 +370,11 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
} }
@Override
protected boolean shouldLogClient() {
return false;
}
private void waitForQueueToDrain() throws InterruptedException { private void waitForQueueToDrain() throws InterruptedException {
RestHookTestDstu2Test.waitForQueueToDrain(ourRestHookSubscriptionInterceptor); RestHookTestDstu2Test.waitForQueueToDrain(ourRestHookSubscriptionInterceptor);
} }
@ -220,6 +404,31 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
} }
public static class PatientListener implements IResourceProvider {
@Create
public MethodOutcome create(@ResourceParam Patient thePatient, HttpServletRequest theRequest) {
ourLog.info("Received Listener Create");
ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
ourCreatedPatients.add(thePatient);
return new MethodOutcome(new IdType("Patient/1"), true);
}
@Override
public Class<? extends IBaseResource> getResourceType() {
return Patient.class;
}
@Update
public MethodOutcome update(@ResourceParam Patient thePatient, HttpServletRequest theRequest) {
ourUpdatedPatients.add(thePatient);
ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
ourLog.info("Received Listener Update (now have {} updates)", ourUpdatedPatients.size());
return new MethodOutcome(new IdType("Patient/1"), false);
}
}
@BeforeClass @BeforeClass
public static void startListenerServer() throws Exception { public static void startListenerServer() throws Exception {
ourListenerPort = PortUtil.findFreePort(); ourListenerPort = PortUtil.findFreePort();
@ -227,7 +436,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context";
ObservationListener obsListener = new ObservationListener(); ObservationListener obsListener = new ObservationListener();
ourListenerRestServer.setResourceProviders(obsListener); PatientListener ptListener = new PatientListener();
ourListenerRestServer.setResourceProviders(obsListener, ptListener);
ourListenerServer = new Server(ourListenerPort); ourListenerServer = new Server(ourListenerPort);