Merge branch 'master' into philips-3.6.0
This commit is contained in:
commit
da2763d8c8
|
@ -442,8 +442,6 @@ public class FhirContext {
|
|||
*
|
||||
* @throws DataFormatException If the resource name is not known
|
||||
*/
|
||||
// Multiple spots in HAPI FHIR and Smile CDR depend on DataFormatException being
|
||||
// thrown by this method, don't change that.
|
||||
public RuntimeResourceDefinition getResourceDefinition(String theResourceName) throws DataFormatException {
|
||||
validateInitialized();
|
||||
Validate.notBlank(theResourceName, "theResourceName must not be blank");
|
||||
|
@ -454,6 +452,10 @@ public class FhirContext {
|
|||
if (retVal == null) {
|
||||
Class<? extends IBaseResource> clazz = myNameToResourceType.get(resourceName.toLowerCase());
|
||||
if (clazz == null) {
|
||||
// ***********************************************************************
|
||||
// Multiple spots in HAPI FHIR and Smile CDR depend on DataFormatException
|
||||
// being thrown by this method, don't change that.
|
||||
// ***********************************************************************
|
||||
throw new DataFormatException(createUnknownResourceNameError(theResourceName, myVersion.getVersion()));
|
||||
}
|
||||
if (IBaseResource.class.isAssignableFrom(clazz)) {
|
||||
|
|
|
@ -22,6 +22,7 @@ package ca.uhn.fhir.util;
|
|||
|
||||
import org.hl7.fhir.instance.model.api.IPrimitiveType;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
@ -35,7 +36,7 @@ public class DatatypeUtil {
|
|||
HashSet<String> retVal = new HashSet<>();
|
||||
if (theStringList != null) {
|
||||
for (IPrimitiveType<?> string : theStringList) {
|
||||
if (string != null && string.getValue()!=null) {
|
||||
if (string != null && string.getValue() != null) {
|
||||
retVal.add(string.getValueAsString());
|
||||
}
|
||||
}
|
||||
|
@ -44,7 +45,7 @@ public class DatatypeUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Joins a list of strings with a single space (' ') between each string
|
||||
* Joins a list of strings with a single space (' ') between each string
|
||||
*/
|
||||
public static String joinStringsSpaceSeparated(List<? extends IPrimitiveType<String>> theStrings) {
|
||||
StringBuilder b = new StringBuilder();
|
||||
|
|
|
@ -70,7 +70,7 @@ public abstract class BaseConfig implements SchedulingConfigurer {
|
|||
@Autowired
|
||||
protected Environment myEnv;
|
||||
|
||||
@Bean(name="myDaoRegistry")
|
||||
@Bean(name = "myDaoRegistry")
|
||||
public DaoRegistry daoRegistry() {
|
||||
return new DaoRegistry();
|
||||
}
|
||||
|
@ -141,13 +141,13 @@ public abstract class BaseConfig implements SchedulingConfigurer {
|
|||
return b.getObject();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Bean(name="mySubscriptionTriggeringProvider")
|
||||
@Lazy
|
||||
public SubscriptionTriggeringProvider mySubscriptionTriggeringProvider() {
|
||||
public SubscriptionTriggeringProvider subscriptionTriggeringProvider() {
|
||||
return new SubscriptionTriggeringProvider();
|
||||
}
|
||||
|
||||
@Bean(autowire = Autowire.BY_TYPE)
|
||||
@Bean(autowire = Autowire.BY_TYPE, name = "mySearchCoordinatorSvc")
|
||||
public ISearchCoordinatorSvc searchCoordinatorSvc() {
|
||||
return new SearchCoordinatorSvcImpl();
|
||||
}
|
||||
|
|
|
@ -832,6 +832,25 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
|||
return processMatchUrl(theMatchUrl, getResourceType());
|
||||
}
|
||||
|
||||
@Override
|
||||
public IBaseResource readByPid(Long thePid) {
|
||||
StopWatch w = new StopWatch();
|
||||
|
||||
Optional<ResourceTable> entity = myResourceTableDao.findById(thePid);
|
||||
if (!entity.isPresent()) {
|
||||
throw new ResourceNotFoundException("No resource found with PID " + thePid);
|
||||
}
|
||||
if (entity.get().getDeleted() != null) {
|
||||
throw new ResourceGoneException("Resource was deleted at " + new InstantType(entity.get().getDeleted()).getValueAsString());
|
||||
}
|
||||
|
||||
T retVal = toResource(myResourceType, entity.get(), null, false);
|
||||
|
||||
ourLog.debug("Processed read on {} in {}ms", thePid, w.getMillis());
|
||||
return retVal;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public T read(IIdType theId) {
|
||||
return read(theId, null);
|
||||
|
|
|
@ -158,6 +158,11 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
|
|||
*/
|
||||
T read(IIdType theId);
|
||||
|
||||
/**
|
||||
* Read a resource by its internal PID
|
||||
*/
|
||||
IBaseResource readByPid(Long thePid);
|
||||
|
||||
/**
|
||||
* @param theId
|
||||
* @param theRequestDetails TODO
|
||||
|
@ -239,6 +244,7 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
|
|||
|
||||
RuntimeResourceDefinition validateCriteriaAndReturnResourceDefinition(String criteria);
|
||||
|
||||
|
||||
// /**
|
||||
// * Invoke the everything operation
|
||||
// */
|
||||
|
|
|
@ -21,64 +21,164 @@ package ca.uhn.fhir.jpa.provider;
|
|||
*/
|
||||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.context.RuntimeResourceDefinition;
|
||||
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
|
||||
import ca.uhn.fhir.jpa.dao.DaoRegistry;
|
||||
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.dao.IFhirSystemDao;
|
||||
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
|
||||
import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl;
|
||||
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
|
||||
import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessage;
|
||||
import ca.uhn.fhir.jpa.util.JpaConstants;
|
||||
import ca.uhn.fhir.rest.annotation.IdParam;
|
||||
import ca.uhn.fhir.rest.annotation.Operation;
|
||||
import ca.uhn.fhir.rest.annotation.OperationParam;
|
||||
import ca.uhn.fhir.rest.api.CacheControlDirective;
|
||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||
import ca.uhn.fhir.rest.param.StringParam;
|
||||
import ca.uhn.fhir.rest.param.UriParam;
|
||||
import ca.uhn.fhir.rest.server.IResourceProvider;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
|
||||
import ca.uhn.fhir.util.ParametersUtil;
|
||||
import ca.uhn.fhir.util.StopWatch;
|
||||
import ca.uhn.fhir.util.ValidateUtil;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
import org.hl7.fhir.instance.model.IdType;
|
||||
import org.hl7.fhir.instance.model.api.IBaseParameters;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.instance.model.api.IPrimitiveType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
import java.util.List;
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class SubscriptionTriggeringProvider implements IResourceProvider {
|
||||
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
|
||||
public class SubscriptionTriggeringProvider implements IResourceProvider, ApplicationContextAware {
|
||||
|
||||
public static final String RESOURCE_ID = "resourceId";
|
||||
public static final int DEFAULT_MAX_SUBMIT = 10000;
|
||||
public static final String SEARCH_URL = "searchUrl";
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class);
|
||||
private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>();
|
||||
@Autowired
|
||||
private FhirContext myFhirContext;
|
||||
@Autowired
|
||||
private IFhirSystemDao<?,?> mySystemDao;
|
||||
@Autowired(required = false)
|
||||
private DaoRegistry myDaoRegistry;
|
||||
private List<BaseSubscriptionInterceptor<?>> mySubscriptionInterceptorList;
|
||||
private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT;
|
||||
@Autowired
|
||||
private ISearchCoordinatorSvc mySearchCoordinatorSvc;
|
||||
private ApplicationContext myAppCtx;
|
||||
|
||||
@Operation(name= JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
|
||||
/**
|
||||
* Sets the maximum number of resources that will be submitted in a single pass
|
||||
*/
|
||||
public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) {
|
||||
Integer maxSubmitPerPass = theMaxSubmitPerPass;
|
||||
if (maxSubmitPerPass == null) {
|
||||
maxSubmitPerPass = DEFAULT_MAX_SUBMIT;
|
||||
}
|
||||
Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0");
|
||||
myMaxSubmitPerPass = maxSubmitPerPass;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
mySubscriptionInterceptorList = ObjectUtils.defaultIfNull(mySubscriptionInterceptorList, Collections.emptyList());
|
||||
mySubscriptionInterceptorList = new ArrayList<>();
|
||||
Collection values1 = myAppCtx.getBeansOfType(BaseSubscriptionInterceptor.class).values();
|
||||
Collection<BaseSubscriptionInterceptor<?>> values = (Collection<BaseSubscriptionInterceptor<?>>) values1;
|
||||
mySubscriptionInterceptorList.addAll(values);
|
||||
}
|
||||
|
||||
@Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
|
||||
public IBaseParameters triggerSubscription(
|
||||
@OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds,
|
||||
@OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls
|
||||
) {
|
||||
return doTriggerSubscription(theResourceIds, theSearchUrls, null);
|
||||
}
|
||||
|
||||
@Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
|
||||
public IBaseParameters triggerSubscription(
|
||||
@IdParam IIdType theSubscriptionId,
|
||||
@OperationParam(name= RESOURCE_ID) UriParam theResourceId) {
|
||||
@OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds,
|
||||
@OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls
|
||||
) {
|
||||
|
||||
ValidateUtil.isTrueOrThrowInvalidRequest(theResourceId != null, RESOURCE_ID + " parameter not provided");
|
||||
IdType resourceId = new IdType(theResourceId.getValue());
|
||||
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type");
|
||||
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part");
|
||||
// Throw a 404 if the subscription doesn't exist
|
||||
IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getResourceDao("Subscription");
|
||||
IIdType subscriptionId = theSubscriptionId;
|
||||
if (subscriptionId.hasResourceType() == false) {
|
||||
subscriptionId = subscriptionId.withResourceType("Subscription");
|
||||
}
|
||||
subscriptionDao.read(subscriptionId);
|
||||
|
||||
Class<? extends IBaseResource> resourceType = myFhirContext.getResourceDefinition(resourceId.getResourceType()).getImplementingClass();
|
||||
IFhirResourceDao<? extends IBaseResource> dao = mySystemDao.getDao(resourceType);
|
||||
IBaseResource resourceToTrigger = dao.read(resourceId);
|
||||
return doTriggerSubscription(theResourceIds, theSearchUrls, subscriptionId);
|
||||
|
||||
ResourceModifiedMessage msg = new ResourceModifiedMessage();
|
||||
msg.setId(resourceToTrigger.getIdElement());
|
||||
msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE);
|
||||
msg.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue());
|
||||
msg.setNewPayload(myFhirContext, resourceToTrigger);
|
||||
}
|
||||
|
||||
for (BaseSubscriptionInterceptor<?> next :mySubscriptionInterceptorList) {
|
||||
next.submitResourceModified(msg);
|
||||
private IBaseParameters doTriggerSubscription(@OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds, @OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls, @IdParam IIdType theSubscriptionId) {
|
||||
if (mySubscriptionInterceptorList.isEmpty()) {
|
||||
throw new PreconditionFailedException("Subscription processing not active on this server");
|
||||
}
|
||||
|
||||
List<UriParam> resourceIds = ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList());
|
||||
List<StringParam> searchUrls = ObjectUtils.defaultIfNull(theSearchUrls, Collections.emptyList());
|
||||
|
||||
// Make sure we have at least one resource ID or search URL
|
||||
if (resourceIds.size() == 0 && searchUrls.size() == 0) {
|
||||
throw new InvalidRequestException("No resource IDs or search URLs specified for triggering");
|
||||
}
|
||||
|
||||
// Resource URLs must be compete
|
||||
for (UriParam next : resourceIds) {
|
||||
IdType resourceId = new IdType(next.getValue());
|
||||
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type");
|
||||
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part");
|
||||
}
|
||||
|
||||
// Search URLs must be valid
|
||||
for (StringParam next : searchUrls) {
|
||||
if (next.getValue().contains("?") == false) {
|
||||
throw new InvalidRequestException("Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
|
||||
}
|
||||
}
|
||||
|
||||
SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails();
|
||||
jobDetails.setJobId(UUID.randomUUID().toString());
|
||||
jobDetails.setRemainingResourceIds(resourceIds.stream().map(UriParam::getValue).collect(Collectors.toList()));
|
||||
jobDetails.setRemainingSearchUrls(searchUrls.stream().map(StringParam::getValue).collect(Collectors.toList()));
|
||||
if (theSubscriptionId != null) {
|
||||
jobDetails.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue());
|
||||
}
|
||||
|
||||
// Submit job for processing
|
||||
synchronized (myActiveJobs) {
|
||||
myActiveJobs.add(jobDetails);
|
||||
}
|
||||
ourLog.info("Subscription triggering requested for {} resource and {} search - Gave job ID: {}", resourceIds.size(), searchUrls.size(), jobDetails.getJobId());
|
||||
|
||||
// Create a parameters response
|
||||
IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext);
|
||||
IPrimitiveType<?> value = (IPrimitiveType<?>) myFhirContext.getElementDefinition("string").newInstance();
|
||||
value.setValueAsString("Triggered resource " + theResourceId.getValue() + " for subscription");
|
||||
value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId);
|
||||
ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value);
|
||||
return retVal;
|
||||
}
|
||||
|
@ -87,4 +187,214 @@ public class SubscriptionTriggeringProvider implements IResourceProvider {
|
|||
public Class<? extends IBaseResource> getResourceType() {
|
||||
return myFhirContext.getResourceDefinition("Subscription").getImplementingClass();
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = DateUtils.MILLIS_PER_SECOND)
|
||||
public void runDeliveryPass() {
|
||||
|
||||
synchronized (myActiveJobs) {
|
||||
if (myActiveJobs.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
String activeJobIds = myActiveJobs.stream().map(t->t.getJobId()).collect(Collectors.joining(", "));
|
||||
ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds);
|
||||
|
||||
SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);
|
||||
|
||||
runJob(activeJob);
|
||||
|
||||
// If the job is complete, remove it from the queue
|
||||
if (activeJob.getRemainingResourceIds().isEmpty()) {
|
||||
if (activeJob.getRemainingSearchUrls().isEmpty()) {
|
||||
if (isBlank(activeJob.myCurrentSearchUuid)) {
|
||||
myActiveJobs.remove(0);
|
||||
String remainingJobsMsg = "";
|
||||
if (myActiveJobs.size() > 0) {
|
||||
remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)";
|
||||
}
|
||||
ourLog.info("Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
|
||||
StopWatch sw = new StopWatch();
|
||||
ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId());
|
||||
|
||||
// Submit individual resources
|
||||
int totalSubmitted = 0;
|
||||
while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
|
||||
totalSubmitted++;
|
||||
String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
|
||||
submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
|
||||
}
|
||||
|
||||
// If we don't have an active search started, and one needs to be.. start it
|
||||
if (isBlank(theJobDetails.getCurrentSearchUuid()) && theJobDetails.getRemainingSearchUrls().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
|
||||
String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0);
|
||||
RuntimeResourceDefinition resourceDef = CacheWarmingSvcImpl.parseUrlResourceType(myFhirContext, nextSearchUrl);
|
||||
String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?'));
|
||||
String resourceType = resourceDef.getName();
|
||||
|
||||
IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType);
|
||||
SearchParameterMap params = BaseHapiFhirDao.translateMatchUrl(callingDao, myFhirContext, queryPart, resourceDef);
|
||||
|
||||
ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl);
|
||||
|
||||
IBundleProvider search = mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective());
|
||||
theJobDetails.setCurrentSearchUuid(search.getUuid());
|
||||
theJobDetails.setCurrentSearchResourceType(resourceType);
|
||||
theJobDetails.setCurrentSearchCount(params.getCount());
|
||||
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
|
||||
}
|
||||
|
||||
// If we have an active search going, submit resources from it
|
||||
if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) {
|
||||
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
|
||||
|
||||
IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
|
||||
|
||||
int maxQuerySize = myMaxSubmitPerPass - totalSubmitted;
|
||||
int toIndex = fromIndex + maxQuerySize;
|
||||
if (theJobDetails.getCurrentSearchCount() != null) {
|
||||
toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
|
||||
}
|
||||
ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
|
||||
List<Long> resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
|
||||
|
||||
ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
|
||||
for (Long next : resourceIds) {
|
||||
IBaseResource nextResource = resourceDao.readByPid(next);
|
||||
submitResource(theJobDetails.getSubscriptionId(), nextResource);
|
||||
totalSubmitted++;
|
||||
theJobDetails.setCurrentSearchLastUploadedIndex(theJobDetails.getCurrentSearchLastUploadedIndex()+1);
|
||||
}
|
||||
|
||||
int expectedCount = toIndex - fromIndex;
|
||||
if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
|
||||
ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
|
||||
theJobDetails.setCurrentSearchResourceType(null);
|
||||
theJobDetails.setCurrentSearchUuid(null);
|
||||
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
|
||||
theJobDetails.setCurrentSearchCount(null);
|
||||
}
|
||||
}
|
||||
|
||||
ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
|
||||
org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
|
||||
IFhirResourceDao<? extends IBaseResource> dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
|
||||
IBaseResource resourceToTrigger = dao.read(resourceId);
|
||||
|
||||
submitResource(theSubscriptionId, resourceToTrigger);
|
||||
}
|
||||
|
||||
private void submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
|
||||
|
||||
ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId);
|
||||
|
||||
ResourceModifiedMessage msg = new ResourceModifiedMessage();
|
||||
msg.setId(theResourceToTrigger.getIdElement());
|
||||
msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE);
|
||||
msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue());
|
||||
msg.setNewPayload(myFhirContext, theResourceToTrigger);
|
||||
|
||||
for (BaseSubscriptionInterceptor<?> next : mySubscriptionInterceptorList) {
|
||||
next.submitResourceModified(msg);
|
||||
}
|
||||
}
|
||||
|
||||
public void cancelAll() {
|
||||
synchronized (myActiveJobs) {
|
||||
myActiveJobs.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
myAppCtx = applicationContext;
|
||||
}
|
||||
|
||||
private static class SubscriptionTriggeringJobDetails {
|
||||
|
||||
private String myJobId;
|
||||
private String mySubscriptionId;
|
||||
private List<String> myRemainingResourceIds;
|
||||
private List<String> myRemainingSearchUrls;
|
||||
private String myCurrentSearchUuid;
|
||||
private Integer myCurrentSearchCount;
|
||||
private String myCurrentSearchResourceType;
|
||||
private int myCurrentSearchLastUploadedIndex;
|
||||
|
||||
public Integer getCurrentSearchCount() {
|
||||
return myCurrentSearchCount;
|
||||
}
|
||||
|
||||
public void setCurrentSearchCount(Integer theCurrentSearchCount) {
|
||||
myCurrentSearchCount = theCurrentSearchCount;
|
||||
}
|
||||
|
||||
public String getCurrentSearchResourceType() {
|
||||
return myCurrentSearchResourceType;
|
||||
}
|
||||
|
||||
public void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
|
||||
myCurrentSearchResourceType = theCurrentSearchResourceType;
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
return myJobId;
|
||||
}
|
||||
|
||||
public void setJobId(String theJobId) {
|
||||
myJobId = theJobId;
|
||||
}
|
||||
|
||||
public String getSubscriptionId() {
|
||||
return mySubscriptionId;
|
||||
}
|
||||
|
||||
public void setSubscriptionId(String theSubscriptionId) {
|
||||
mySubscriptionId = theSubscriptionId;
|
||||
}
|
||||
|
||||
public List<String> getRemainingResourceIds() {
|
||||
return myRemainingResourceIds;
|
||||
}
|
||||
|
||||
public void setRemainingResourceIds(List<String> theRemainingResourceIds) {
|
||||
myRemainingResourceIds = theRemainingResourceIds;
|
||||
}
|
||||
|
||||
public List<String> getRemainingSearchUrls() {
|
||||
return myRemainingSearchUrls;
|
||||
}
|
||||
|
||||
public void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
|
||||
myRemainingSearchUrls = theRemainingSearchUrls;
|
||||
}
|
||||
|
||||
public String getCurrentSearchUuid() {
|
||||
return myCurrentSearchUuid;
|
||||
}
|
||||
|
||||
public void setCurrentSearchUuid(String theCurrentSearchUuid) {
|
||||
myCurrentSearchUuid = theCurrentSearchUuid;
|
||||
}
|
||||
|
||||
public int getCurrentSearchLastUploadedIndex() {
|
||||
return myCurrentSearchLastUploadedIndex;
|
||||
}
|
||||
|
||||
public void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
|
||||
myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -117,7 +117,7 @@ public class PersistedJpaBundleProvider implements IBundleProvider {
|
|||
|
||||
results = query.getResultList();
|
||||
|
||||
ArrayList<IBaseResource> retVal = new ArrayList<IBaseResource>();
|
||||
ArrayList<IBaseResource> retVal = new ArrayList<>();
|
||||
for (ResourceHistoryTable next : results) {
|
||||
BaseHasResource resource;
|
||||
resource = next;
|
||||
|
@ -157,7 +157,7 @@ public class PersistedJpaBundleProvider implements IBundleProvider {
|
|||
|
||||
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
|
||||
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
|
||||
txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_UNCOMMITTED);
|
||||
txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
|
||||
return txTemplate.execute(s -> {
|
||||
try {
|
||||
setSearchEntity(mySearchDao.findByUuid(myUuid));
|
||||
|
|
|
@ -634,7 +634,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
|||
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
|
||||
|
||||
if (myCustomIsolationSupported) {
|
||||
txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_UNCOMMITTED);
|
||||
txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
|
||||
}
|
||||
|
||||
txTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||
|
|
|
@ -24,6 +24,7 @@ import ca.uhn.fhir.context.ConfigurationException;
|
|||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.context.RuntimeResourceDefinition;
|
||||
import ca.uhn.fhir.jpa.dao.*;
|
||||
import ca.uhn.fhir.parser.DataFormatException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
|
@ -68,7 +69,7 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
|
|||
private void refreshNow(WarmCacheEntry theCacheEntry) {
|
||||
String nextUrl = theCacheEntry.getUrl();
|
||||
|
||||
RuntimeResourceDefinition resourceDef = parseWarmUrlResourceType(nextUrl);
|
||||
RuntimeResourceDefinition resourceDef = parseUrlResourceType(myCtx, nextUrl);
|
||||
IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceDef.getName());
|
||||
String queryPart = parseWarmUrlParamPart(nextUrl);
|
||||
SearchParameterMap responseCriteriaUrl = BaseHapiFhirDao.translateMatchUrl(callingDao, myCtx, queryPart, resourceDef);
|
||||
|
@ -84,14 +85,18 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
|
|||
return theNextUrl.substring(paramIndex);
|
||||
}
|
||||
|
||||
private RuntimeResourceDefinition parseWarmUrlResourceType(String theNextUrl) {
|
||||
int paramIndex = theNextUrl.indexOf('?');
|
||||
String resourceName = theNextUrl.substring(0, paramIndex);
|
||||
/**
|
||||
* TODO: this method probably belongs in a utility class, not here
|
||||
*
|
||||
* @throws DataFormatException If the resource type is not known
|
||||
*/
|
||||
public static RuntimeResourceDefinition parseUrlResourceType(FhirContext theCtx, String theUrl) throws DataFormatException {
|
||||
int paramIndex = theUrl.indexOf('?');
|
||||
String resourceName = theUrl.substring(0, paramIndex);
|
||||
if (resourceName.contains("/")) {
|
||||
resourceName = resourceName.substring(resourceName.lastIndexOf('/') + 1);
|
||||
}
|
||||
RuntimeResourceDefinition resourceDef = myCtx.getResourceDefinition(resourceName);
|
||||
return resourceDef;
|
||||
return theCtx.getResourceDefinition(resourceName);
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
|
@ -107,7 +112,7 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc {
|
|||
|
||||
// Validate
|
||||
parseWarmUrlParamPart(next.getUrl());
|
||||
parseWarmUrlResourceType(next.getUrl());
|
||||
parseUrlResourceType(myCtx, next.getUrl());
|
||||
|
||||
myCacheEntryToNextRefresh.put(next, 0L);
|
||||
}
|
||||
|
|
|
@ -401,14 +401,14 @@ public abstract class BaseJpaTest {
|
|||
|
||||
public static void waitForSize(int theTarget, List<?> theList) {
|
||||
StopWatch sw = new StopWatch();
|
||||
while (theList.size() != theTarget && sw.getMillis() <= 15000) {
|
||||
while (theList.size() != theTarget && sw.getMillis() <= 16000) {
|
||||
try {
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException theE) {
|
||||
throw new Error(theE);
|
||||
}
|
||||
}
|
||||
if (sw.getMillis() >= 15000) {
|
||||
if (sw.getMillis() >= 16000) {
|
||||
String describeResults = theList
|
||||
.stream()
|
||||
.map(t -> {
|
||||
|
|
|
@ -66,6 +66,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test {
|
|||
protected static ISearchCoordinatorSvc mySearchCoordinatorSvc;
|
||||
private static Server ourServer;
|
||||
private TerminologyUploaderProviderDstu3 myTerminologyUploaderProvider;
|
||||
protected static SubscriptionTriggeringProvider ourSubscriptionTriggeringProvider;
|
||||
|
||||
public BaseResourceProviderDstu3Test() {
|
||||
super();
|
||||
|
@ -160,6 +161,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test {
|
|||
ourRestHookSubscriptionInterceptor = wac.getBean(SubscriptionRestHookInterceptor.class);
|
||||
ourEmailSubscriptionInterceptor = wac.getBean(SubscriptionEmailInterceptor.class);
|
||||
ourSearchParamRegistry = wac.getBean(SearchParamRegistryDstu3.class);
|
||||
ourSubscriptionTriggeringProvider = wac.getBean(SubscriptionTriggeringProvider.class);
|
||||
|
||||
myFhirCtx.getRestfulClientFactory().setSocketTimeout(5000000);
|
||||
ourClient = myFhirCtx.newRestfulGenericClient(ourServerBase);
|
||||
|
|
|
@ -13,6 +13,7 @@ import ca.uhn.fhir.rest.api.MethodOutcome;
|
|||
import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor;
|
||||
import ca.uhn.fhir.rest.server.IResourceProvider;
|
||||
import ca.uhn.fhir.rest.server.RestfulServer;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||
import ca.uhn.fhir.util.PortUtil;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
|
@ -27,7 +28,10 @@ import javax.servlet.http.HttpServletRequest;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* Test the rest-hook subscriptions
|
||||
|
@ -36,12 +40,14 @@ import static org.junit.Assert.assertEquals;
|
|||
public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Test {
|
||||
|
||||
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionTriggeringDstu3Test.class);
|
||||
private static List<Observation> ourCreatedObservations = Lists.newArrayList();
|
||||
private static int ourListenerPort;
|
||||
private static RestfulServer ourListenerRestServer;
|
||||
private static Server ourListenerServer;
|
||||
private static String ourListenerServerBase;
|
||||
private static List<Observation> ourCreatedObservations = Lists.newArrayList();
|
||||
private static List<Observation> ourUpdatedObservations = Lists.newArrayList();
|
||||
private static List<Patient> ourCreatedPatients = Lists.newArrayList();
|
||||
private static List<Patient> ourUpdatedPatients = Lists.newArrayList();
|
||||
private static List<String> ourContentTypes = new ArrayList<>();
|
||||
private List<IIdType> mySubscriptionIds = new ArrayList<>();
|
||||
|
||||
|
@ -56,13 +62,17 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
|
|||
|
||||
myDaoConfig.setAllowMultipleDelete(true);
|
||||
ourLog.info("Deleting all subscriptions");
|
||||
ourClient.delete().resourceConditionalByUrl("Subscription?status=active").execute();
|
||||
ourClient.delete().resourceConditionalByUrl("Observation?code:missing=false").execute();
|
||||
ourClient.delete().resourceConditionalByUrl("Subscription?_lastUpdated=lt3000").execute();
|
||||
ourClient.delete().resourceConditionalByUrl("Observation?_lastUpdated=lt3000").execute();
|
||||
ourLog.info("Done deleting all subscriptions");
|
||||
myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete());
|
||||
|
||||
ourRestServer.unregisterInterceptor(ourRestHookSubscriptionInterceptor);
|
||||
|
||||
ourSubscriptionTriggeringProvider.cancelAll();
|
||||
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(null);
|
||||
|
||||
myDaoConfig.setSearchPreFetchThresholds(new DaoConfig().getSearchPreFetchThresholds());
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -70,10 +80,15 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
|
|||
ourRestServer.registerInterceptor(ourRestHookSubscriptionInterceptor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Only do counter resets here! We call this inside tests
|
||||
*/
|
||||
@Before
|
||||
public void beforeReset() {
|
||||
ourCreatedObservations.clear();
|
||||
ourUpdatedObservations.clear();
|
||||
ourCreatedPatients.clear();
|
||||
ourUpdatedPatients.clear();
|
||||
ourContentTypes.clear();
|
||||
}
|
||||
|
||||
|
@ -90,7 +105,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
|
|||
subscription.setChannel(channel);
|
||||
|
||||
MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute();
|
||||
subscription.setId(methodOutcome.getId().getIdPart());
|
||||
subscription.setId(methodOutcome.getId());
|
||||
mySubscriptionIds.add(methodOutcome.getId());
|
||||
|
||||
waitForQueueToDrain();
|
||||
|
@ -116,6 +131,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
|
|||
return observation;
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testTriggerResourceToSpecificSubscription() throws Exception {
|
||||
String payload = "application/fhir+json";
|
||||
|
@ -135,8 +151,6 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
|
|||
waitForSize(1, ourUpdatedObservations);
|
||||
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
|
||||
|
||||
ourClient.registerInterceptor(new LoggingInterceptor(true));
|
||||
|
||||
Parameters response = ourClient
|
||||
.operation()
|
||||
.onInstance(subscriptionId)
|
||||
|
@ -145,7 +159,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
|
|||
.execute();
|
||||
|
||||
String responseValue = response.getParameter().get(0).getValue().primitiveValue();
|
||||
assertEquals("Triggered resource " + obsId.getValue() + " for subscription", responseValue);
|
||||
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
|
||||
|
||||
waitForQueueToDrain();
|
||||
waitForSize(0, ourCreatedObservations);
|
||||
|
@ -153,6 +167,171 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTriggerUsingMultipleSearches() throws Exception {
|
||||
myDaoConfig.setSearchPreFetchThresholds(Lists.newArrayList(13, 22, 100));
|
||||
|
||||
String payload = "application/fhir+json";
|
||||
IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement();
|
||||
IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement();
|
||||
|
||||
// Create lots
|
||||
for (int i = 0; i < 50; i++) {
|
||||
Patient p = new Patient();
|
||||
p.addName().setFamily("P" + i);
|
||||
ourClient.create().resource(p).execute();
|
||||
}
|
||||
for (int i = 0; i < 50; i++) {
|
||||
Observation o = new Observation();
|
||||
o.setId("O" + i);
|
||||
o.setStatus(Observation.ObservationStatus.FINAL);
|
||||
o.getCode().setText("O" + i);
|
||||
ourClient.update().resource(o).execute();
|
||||
}
|
||||
|
||||
waitForSize(50, ourUpdatedObservations);
|
||||
waitForSize(0, ourCreatedObservations);
|
||||
waitForSize(0, ourCreatedPatients);
|
||||
waitForSize(50, ourUpdatedPatients);
|
||||
beforeReset();
|
||||
|
||||
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33);
|
||||
|
||||
Parameters response = ourClient
|
||||
.operation()
|
||||
.onInstance(sub1id)
|
||||
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
|
||||
.withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation?"))
|
||||
.andParameter(SubscriptionTriggeringProvider.RESOURCE_ID, new UriType("Observation/O2"))
|
||||
.execute();
|
||||
String responseValue = response.getParameter().get(0).getValue().primitiveValue();
|
||||
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
|
||||
|
||||
response = ourClient
|
||||
.operation()
|
||||
.onInstance(sub2id)
|
||||
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
|
||||
.withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Patient?"))
|
||||
.execute();
|
||||
responseValue = response.getParameter().get(0).getValue().primitiveValue();
|
||||
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
|
||||
|
||||
// Thread.sleep(1000000000);
|
||||
|
||||
waitForSize(51, ourUpdatedObservations);
|
||||
waitForSize(0, ourCreatedObservations);
|
||||
waitForSize(0, ourCreatedPatients);
|
||||
waitForSize(50, ourUpdatedPatients);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTriggerUsingSearchesWithCount() throws Exception {
|
||||
String payload = "application/fhir+json";
|
||||
IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement();
|
||||
IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement();
|
||||
|
||||
// Create lots
|
||||
for (int i = 0; i < 50; i++) {
|
||||
Patient p = new Patient();
|
||||
p.addName().setFamily("P" + i);
|
||||
ourClient.create().resource(p).execute();
|
||||
}
|
||||
for (int i = 0; i < 50; i++) {
|
||||
Observation o = new Observation();
|
||||
o.setId("O" + i);
|
||||
o.setStatus(Observation.ObservationStatus.FINAL);
|
||||
o.getCode().setText("O" + i);
|
||||
ourClient.update().resource(o).execute();
|
||||
}
|
||||
|
||||
waitForSize(50, ourUpdatedObservations);
|
||||
waitForSize(0, ourCreatedObservations);
|
||||
waitForSize(0, ourCreatedPatients);
|
||||
waitForSize(50, ourUpdatedPatients);
|
||||
beforeReset();
|
||||
|
||||
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33);
|
||||
|
||||
Parameters response = ourClient
|
||||
.operation()
|
||||
.onInstance(sub1id)
|
||||
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
|
||||
.withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation?_count=10"))
|
||||
.execute();
|
||||
String responseValue = response.getParameter().get(0).getValue().primitiveValue();
|
||||
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
|
||||
|
||||
response = ourClient
|
||||
.operation()
|
||||
.onInstance(sub2id)
|
||||
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
|
||||
.withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Patient?_count=16"))
|
||||
.execute();
|
||||
responseValue = response.getParameter().get(0).getValue().primitiveValue();
|
||||
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
|
||||
|
||||
waitForSize(10, ourUpdatedObservations);
|
||||
waitForSize(0, ourCreatedObservations);
|
||||
waitForSize(0, ourCreatedPatients);
|
||||
waitForSize(16, ourUpdatedPatients);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTriggerUsingInvalidSearchUrl() {
|
||||
|
||||
try {
|
||||
ourClient
|
||||
.operation()
|
||||
.onType(Subscription.class)
|
||||
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
|
||||
.withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation"))
|
||||
.execute();
|
||||
fail();
|
||||
} catch (InvalidRequestException e) {
|
||||
assertEquals("HTTP 400 Bad Request: Search URL is not valid (must be in the form \"[resource type]?[optional params]\")", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTriggerAllSubscriptions() throws Exception {
|
||||
String payload = "application/fhir+json";
|
||||
IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement();
|
||||
IdType sub2id = createSubscription("Observation?status=final", payload, ourListenerServerBase).getIdElement();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Observation o = new Observation();
|
||||
o.setId("O" + i);
|
||||
o.setStatus(Observation.ObservationStatus.FINAL);
|
||||
o.getCode().setText("O" + i);
|
||||
ourClient.update().resource(o).execute();
|
||||
}
|
||||
|
||||
waitForSize(20, ourUpdatedObservations);
|
||||
waitForSize(0, ourCreatedObservations);
|
||||
waitForSize(0, ourCreatedPatients);
|
||||
waitForSize(0, ourUpdatedPatients);
|
||||
beforeReset();
|
||||
|
||||
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(50);
|
||||
|
||||
Parameters response = ourClient
|
||||
.operation()
|
||||
.onType(Subscription.class)
|
||||
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
|
||||
.withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation?"))
|
||||
.execute();
|
||||
String responseValue = response.getParameter().get(0).getValue().primitiveValue();
|
||||
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
|
||||
|
||||
waitForSize(20, ourUpdatedObservations);
|
||||
waitForSize(0, ourCreatedObservations);
|
||||
waitForSize(0, ourCreatedPatients);
|
||||
waitForSize(0, ourUpdatedPatients);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTriggerResourceToSpecificSubscriptionWhichDoesntMatch() throws Exception {
|
||||
String payload = "application/fhir+json";
|
||||
|
@ -172,8 +351,6 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
|
|||
waitForSize(1, ourUpdatedObservations);
|
||||
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
|
||||
|
||||
ourClient.registerInterceptor(new LoggingInterceptor(true));
|
||||
|
||||
Parameters response = ourClient
|
||||
.operation()
|
||||
.onInstance(subscriptionId)
|
||||
|
@ -182,7 +359,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
|
|||
.execute();
|
||||
|
||||
String responseValue = response.getParameter().get(0).getValue().primitiveValue();
|
||||
assertEquals("Triggered resource " + obsId.getValue() + " for subscription", responseValue);
|
||||
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
|
||||
|
||||
waitForQueueToDrain();
|
||||
waitForSize(0, ourCreatedObservations);
|
||||
|
@ -191,6 +368,11 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
|
|||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected boolean shouldLogClient() {
|
||||
return false;
|
||||
}
|
||||
|
||||
private void waitForQueueToDrain() throws InterruptedException {
|
||||
RestHookTestDstu2Test.waitForQueueToDrain(ourRestHookSubscriptionInterceptor);
|
||||
}
|
||||
|
@ -220,6 +402,31 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
|
|||
|
||||
}
|
||||
|
||||
public static class PatientListener implements IResourceProvider {
|
||||
|
||||
@Create
|
||||
public MethodOutcome create(@ResourceParam Patient thePatient, HttpServletRequest theRequest) {
|
||||
ourLog.info("Received Listener Create");
|
||||
ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
|
||||
ourCreatedPatients.add(thePatient);
|
||||
return new MethodOutcome(new IdType("Patient/1"), true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends IBaseResource> getResourceType() {
|
||||
return Patient.class;
|
||||
}
|
||||
|
||||
@Update
|
||||
public MethodOutcome update(@ResourceParam Patient thePatient, HttpServletRequest theRequest) {
|
||||
ourUpdatedPatients.add(thePatient);
|
||||
ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
|
||||
ourLog.info("Received Listener Update (now have {} updates)", ourUpdatedPatients.size());
|
||||
return new MethodOutcome(new IdType("Patient/1"), false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void startListenerServer() throws Exception {
|
||||
ourListenerPort = PortUtil.findFreePort();
|
||||
|
@ -227,7 +434,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
|
|||
ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context";
|
||||
|
||||
ObservationListener obsListener = new ObservationListener();
|
||||
ourListenerRestServer.setResourceProviders(obsListener);
|
||||
PatientListener ptListener = new PatientListener();
|
||||
ourListenerRestServer.setResourceProviders(obsListener, ptListener);
|
||||
|
||||
ourListenerServer = new Server(ourListenerPort);
|
||||
|
||||
|
|
|
@ -83,7 +83,8 @@ public interface IAuthRuleBuilderRule {
|
|||
|
||||
/**
|
||||
* This rule applies to the FHIR transaction operation. Transaction is a special
|
||||
* case in that it bundles other operations
|
||||
* case in that it bundles other operations. This permission also allows FHIR
|
||||
* batch to be performed.
|
||||
*/
|
||||
IAuthRuleBuilderRuleTransaction transaction();
|
||||
|
||||
|
|
|
@ -223,7 +223,6 @@ public class RuleBuilder implements IAuthRuleBuilder {
|
|||
|
||||
@Override
|
||||
public IAuthRuleBuilderRuleTransaction transaction() {
|
||||
myRuleOp = RuleOpEnum.TRANSACTION;
|
||||
return new RuleBuilderRuleTransaction();
|
||||
}
|
||||
|
||||
|
@ -520,11 +519,20 @@ public class RuleBuilder implements IAuthRuleBuilder {
|
|||
|
||||
@Override
|
||||
public IAuthRuleBuilderRuleOpClassifierFinished andApplyNormalRules() {
|
||||
// Allow transaction
|
||||
RuleImplOp rule = new RuleImplOp(myRuleName);
|
||||
rule.setMode(myRuleMode);
|
||||
rule.setOp(myRuleOp);
|
||||
rule.setOp(RuleOpEnum.TRANSACTION);
|
||||
rule.setTransactionAppliesToOp(TransactionAppliesToEnum.ANY_OPERATION);
|
||||
myRules.add(rule);
|
||||
|
||||
// Allow batch
|
||||
rule = new RuleImplOp(myRuleName);
|
||||
rule.setMode(myRuleMode);
|
||||
rule.setOp(RuleOpEnum.BATCH);
|
||||
rule.setTransactionAppliesToOp(TransactionAppliesToEnum.ANY_OPERATION);
|
||||
myRules.add(rule);
|
||||
|
||||
return new RuleBuilderFinished(rule);
|
||||
}
|
||||
|
||||
|
|
|
@ -262,7 +262,7 @@ public class AuthorizationInterceptorDstu2Test {
|
|||
httpPost.setEntity(createFhirResourceEntity(input));
|
||||
status = ourClient.execute(httpPost);
|
||||
extractResponseAndClose(status);
|
||||
assertEquals(403, status.getStatusLine().getStatusCode());
|
||||
assertEquals(200, status.getStatusLine().getStatusCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -495,7 +495,7 @@ public class AuthorizationInterceptorDstu3Test {
|
|||
httpPost.setEntity(createFhirResourceEntity(input));
|
||||
status = ourClient.execute(httpPost);
|
||||
extractResponseAndClose(status);
|
||||
assertEquals(403, status.getStatusLine().getStatusCode());
|
||||
assertEquals(200, status.getStatusLine().getStatusCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -469,7 +469,7 @@ public class AuthorizationInterceptorR4Test {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testBatchWhenOnlyTransactionAllowed() throws Exception {
|
||||
public void testBatchAllowed() throws Exception {
|
||||
ourServlet.registerInterceptor(new AuthorizationInterceptor(PolicyEnum.DENY) {
|
||||
@Override
|
||||
public List<IAuthRule> buildRuleList(RequestDetails theRequestDetails) {
|
||||
|
@ -498,7 +498,7 @@ public class AuthorizationInterceptorR4Test {
|
|||
httpPost.setEntity(createFhirResourceEntity(input));
|
||||
status = ourClient.execute(httpPost);
|
||||
extractResponseAndClose(status);
|
||||
assertEquals(403, status.getStatusLine().getStatusCode());
|
||||
assertEquals(200, status.getStatusLine().getStatusCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -95,6 +95,11 @@
|
|||
When using the testpage overlay to delete a resource, currently a crash can occur
|
||||
if an unqualified ID is placed in the ID text box. This has been corrected.
|
||||
</action>
|
||||
<action type="fix">
|
||||
AuthorizationInterceptor did not allow FHIR batch operations when the transaction()
|
||||
permission is granted. This has been corrected so that transaction() allows both
|
||||
batch and transaction requests to proceed.
|
||||
</action>
|
||||
</release>
|
||||
|
||||
<release version="3.5.0" date="2018-09-17">
|
||||
|
|
Loading…
Reference in New Issue