Add retrigger subscription operation

This commit is contained in:
James Agnew 2018-09-27 20:03:01 -04:00
parent dc4c0809a4
commit 002c4b3ff7
22 changed files with 498 additions and 91 deletions

View File

@ -63,6 +63,7 @@ public class FhirServerConfig extends BaseJavaConfigDstu3 {
return retVal;
}
@Override
@Bean()
public LocalContainerEntityManagerFactoryBean entityManagerFactory() {
LocalContainerEntityManagerFactoryBean retVal = new LocalContainerEntityManagerFactoryBean();

View File

@ -17,7 +17,7 @@ public class BaseCommandTest {
InputStream stdin = System.in;
try {
System.setIn(new ByteArrayInputStream("A VALUE".getBytes()));
System.setIn(new ByteArrayInputStream("A VALUE\n".getBytes()));
String value = new MyBaseCommand().read();
assertEquals("A VALUE", value);

View File

@ -56,6 +56,7 @@ import java.io.Reader;
import java.util.*;
import java.util.Map.Entry;
import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -1152,6 +1153,7 @@ public class GenericClient extends BaseClient implements IGenericClient {
version = null;
} else if (myId != null) {
resourceName = myId.getResourceType();
Validate.notBlank(defaultString(resourceName), "Can not invoke operation \"$%s\" on instance \"%s\" - No resource type specified", myOperationName, myId.getValue());
id = myId.getIdPart();
version = myId.getVersionIdPart();
} else {

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.config;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.HapiLocalizer;
import ca.uhn.fhir.jpa.provider.SubscriptionRetriggeringProvider;
import ca.uhn.fhir.jpa.search.*;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.sp.SearchParamPresenceSvcImpl;
@ -108,6 +109,12 @@ public abstract class BaseConfig implements SchedulingConfigurer {
return b.getObject();
}
@Bean
@Lazy
public SubscriptionRetriggeringProvider mySubscriptionRetriggeringProvider() {
return new SubscriptionRetriggeringProvider();
}
@Bean(autowire = Autowire.BY_TYPE)
public ISearchCoordinatorSvc searchCoordinatorSvc() {
return new SearchCoordinatorSvcImpl();

View File

@ -1079,6 +1079,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
String resourceName = defaultIfBlank(theResourceName, null);
Search search = new Search();
search.setDeleted(false);
search.setCreated(new Date());
search.setSearchLastReturned(new Date());
search.setLastUpdated(theSince, theUntil);

View File

@ -1,11 +1,16 @@
package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.entity.Search;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.Collection;
import java.util.Date;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
/*
* #%L
* HAPI FHIR JPA Server
@ -15,9 +20,9 @@ import org.springframework.data.domain.Slice;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -26,13 +31,6 @@ import org.springframework.data.domain.Slice;
* #L%
*/
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import ca.uhn.fhir.jpa.entity.Search;
public interface ISearchDao extends JpaRepository<Search, Long> {
@Query("SELECT s FROM Search s WHERE s.myUuid = :uuid")
@ -44,11 +42,14 @@ public interface ISearchDao extends JpaRepository<Search, Long> {
// @Query("SELECT s FROM Search s WHERE s.myCreated < :cutoff")
// public Collection<Search> findWhereCreatedBefore(@Param("cutoff") Date theCutoff);
@Query("SELECT s FROM Search s WHERE s.myResourceType = :type AND mySearchQueryStringHash = :hash AND s.myCreated > :cutoff")
@Query("SELECT s FROM Search s WHERE s.myResourceType = :type AND mySearchQueryStringHash = :hash AND s.myCreated > :cutoff AND s.myDeleted = false")
Collection<Search> find(@Param("type") String theResourceType, @Param("hash") int theHashCode, @Param("cutoff") Date theCreatedCutoff);
@Modifying
@Query("UPDATE Search s SET s.mySearchLastReturned = :last WHERE s.myId = :pid")
void updateSearchLastReturned(@Param("pid") long thePid, @Param("last") Date theDate);
@Modifying
@Query("UPDATE Search s SET s.myDeleted = :deleted WHERE s.myId = :pid")
void updateDeleted(@Param("pid") Long thePid, @Param("deleted") boolean theDeleted);
}

View File

@ -1,9 +1,13 @@
package ca.uhn.fhir.jpa.dao.data;
import java.util.Collection;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchResult;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
/*
* #%L
@ -14,9 +18,9 @@ import org.springframework.data.domain.Pageable;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -25,23 +29,11 @@ import org.springframework.data.domain.Pageable;
* #L%
*/
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchResult;
public interface ISearchResultDao extends JpaRepository<SearchResult, Long> {
@Query(value="SELECT r FROM SearchResult r WHERE r.mySearch = :search")
Collection<SearchResult> findWithSearchUuid(@Param("search") Search theSearch);
@Query(value="SELECT r.myResourcePid FROM SearchResult r WHERE r.mySearch = :search ORDER BY r.myOrder ASC")
Page<Long> findWithSearchUuid(@Param("search") Search theSearch, Pageable thePage);
@Modifying
@Query(value="DELETE FROM SearchResult r WHERE r.mySearchPid = :search")
void deleteForSearch(@Param("search") Long theSearchPid);
@Query(value="SELECT r.myId FROM SearchResult r WHERE r.mySearchPid = :search")
Slice<Long> findForSearch(Pageable thePage, @Param("search") Long theSearchPid);
}

View File

@ -19,9 +19,9 @@ import static org.apache.commons.lang3.StringUtils.left;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -39,77 +39,61 @@ import static org.apache.commons.lang3.StringUtils.left;
})
public class Search implements Serializable {
private static final int MAX_SEARCH_QUERY_STRING = 10000;
@SuppressWarnings("WeakerAccess")
public static final int UUID_COLUMN_LENGTH = 36;
private static final int MAX_SEARCH_QUERY_STRING = 10000;
private static final int FAILURE_MESSAGE_LENGTH = 500;
private static final long serialVersionUID = 1L;
@Temporal(TemporalType.TIMESTAMP)
@Column(name = "CREATED", nullable = false, updatable = false)
private Date myCreated;
@Column(name = "SEARCH_DELETED", nullable = true)
private Boolean myDeleted;
@Column(name = "FAILURE_CODE", nullable = true)
private Integer myFailureCode;
@Column(name = "FAILURE_MESSAGE", length = FAILURE_MESSAGE_LENGTH, nullable = true)
private String myFailureMessage;
@Id
@GeneratedValue(strategy = GenerationType.AUTO, generator = "SEQ_SEARCH")
@SequenceGenerator(name = "SEQ_SEARCH", sequenceName = "SEQ_SEARCH")
@Column(name = "PID")
private Long myId;
@OneToMany(mappedBy = "mySearch")
private Collection<SearchInclude> myIncludes;
@Temporal(TemporalType.TIMESTAMP)
@Column(name = "LAST_UPDATED_HIGH", nullable = true, insertable = true, updatable = false)
private Date myLastUpdatedHigh;
@Temporal(TemporalType.TIMESTAMP)
@Column(name = "LAST_UPDATED_LOW", nullable = true, insertable = true, updatable = false)
private Date myLastUpdatedLow;
@Column(name = "NUM_FOUND", nullable = false)
private int myNumFound;
@Column(name = "PREFERRED_PAGE_SIZE", nullable = true)
private Integer myPreferredPageSize;
@Column(name = "RESOURCE_ID", nullable = true)
private Long myResourceId;
@Column(name = "RESOURCE_TYPE", length = 200, nullable = true)
private String myResourceType;
@OneToMany(mappedBy = "mySearch")
private Collection<SearchResult> myResults;
@NotNull
@Temporal(TemporalType.TIMESTAMP)
@Column(name = "SEARCH_LAST_RETURNED", nullable = false, updatable = false)
private Date mySearchLastReturned;
@Lob()
@Basic(fetch = FetchType.LAZY)
@Column(name = "SEARCH_QUERY_STRING", nullable = true, updatable = false, length = MAX_SEARCH_QUERY_STRING)
private String mySearchQueryString;
@Column(name = "SEARCH_QUERY_STRING_HASH", nullable = true, updatable = false)
private Integer mySearchQueryStringHash;
@Enumerated(EnumType.ORDINAL)
@Column(name = "SEARCH_TYPE", nullable = false)
private SearchTypeEnum mySearchType;
@Enumerated(EnumType.STRING)
@Column(name = "SEARCH_STATUS", nullable = false, length = 10)
private SearchStatusEnum myStatus;
@Column(name = "TOTAL_COUNT", nullable = true)
private Integer myTotalCount;
@Column(name = "SEARCH_UUID", length = UUID_COLUMN_LENGTH, nullable = false, updatable = false)
private String myUuid;
@ -120,6 +104,14 @@ public class Search implements Serializable {
super();
}
public Boolean getDeleted() {
return myDeleted;
}
public void setDeleted(Boolean theDeleted) {
myDeleted = theDeleted;
}
public Date getCreated() {
return myCreated;
}

View File

@ -0,0 +1,69 @@
package ca.uhn.fhir.jpa.provider;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.util.JpaConstants;
import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.annotation.RequiredParam;
import ca.uhn.fhir.rest.param.UriParam;
import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.util.OperationOutcomeUtil;
import ca.uhn.fhir.util.ValidateUtil;
import org.hl7.fhir.dstu3.model.UriType;
import org.hl7.fhir.instance.model.IdType;
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
public class SubscriptionRetriggeringProvider implements IResourceProvider {
public static final String RESOURCE_ID = "resourceId";
@Autowired
private FhirContext myFhirContext;
@Autowired
private IFhirSystemDao<?,?> mySystemDao;
@Autowired(required = false)
private List<BaseSubscriptionInterceptor<?>> mySubscriptionInterceptorList;
@Operation(name= JpaConstants.OPERATION_RETRIGGER_SUBSCRIPTION)
public IBaseOperationOutcome reTriggerSubscription(
@IdParam IIdType theSubscriptionId,
@OperationParam(name= RESOURCE_ID) UriParam theResourceId) {
ValidateUtil.isTrueOrThrowInvalidRequest(theResourceId != null, RESOURCE_ID + " parameter not provided");
IdType resourceId = new IdType(theResourceId.getValue());
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type");
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part");
Class<? extends IBaseResource> resourceType = myFhirContext.getResourceDefinition(resourceId.getResourceType()).getImplementingClass();
IFhirResourceDao<? extends IBaseResource> dao = mySystemDao.getDao(resourceType);
IBaseResource resourceToRetrigger = dao.read(resourceId);
ResourceModifiedMessage msg = new ResourceModifiedMessage();
msg.setId(resourceToRetrigger.getIdElement());
msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE);
msg.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue());
msg.setNewPayload(myFhirContext, resourceToRetrigger);
for (BaseSubscriptionInterceptor<?> next :mySubscriptionInterceptorList) {
next.submitResourceModified(msg);
}
IBaseOperationOutcome retVal = OperationOutcomeUtil.newInstance(myFhirContext);
OperationOutcomeUtil.addIssue(myFhirContext, retVal, "information", "Triggered resource " + theResourceId.getValue() + " for subscription", null, null);
return retVal;
}
@Override
public Class<? extends IBaseResource> getResourceType() {
return myFhirContext.getResourceDefinition("Subscription").getImplementingClass();
}
}

View File

@ -321,6 +321,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
Search search = new Search();
search.setDeleted(false);
search.setUuid(searchUuid);
search.setCreated(new Date());
search.setSearchLastReturned(new Date());

View File

@ -29,6 +29,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.dstu3.model.InstantType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Slice;
import org.springframework.scheduling.annotation.Scheduled;
@ -70,8 +71,25 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
mySearchDao.findById(theSearchPid).ifPresent(searchToDelete -> {
ourLog.info("Deleting search {}/{} - Created[{}] -- Last returned[{}]", searchToDelete.getId(), searchToDelete.getUuid(), new InstantType(searchToDelete.getCreated()), new InstantType(searchToDelete.getSearchLastReturned()));
mySearchIncludeDao.deleteForSearch(searchToDelete.getId());
mySearchResultDao.deleteForSearch(searchToDelete.getId());
mySearchDao.delete(searchToDelete);
/*
* Note, we're only deleting up to 1000 results in an individual search here. This
* is to prevent really long running transactions in cases where there are
* huge searches with tons of results in them. By the time we've gotten here
* we have marked the parent Search entity as deleted, so it's not such a
* huge deal to be only partially deleting search results. They'll get deleted
* eventually
*/
int max = 10000;
Slice<Long> resultPids = mySearchResultDao.findForSearch(PageRequest.of(0, max), searchToDelete.getId());
for (Long next : resultPids) {
mySearchResultDao.deleteById(next);
}
// Only delete if we don't have results left in this search
if (resultPids.getNumberOfElements() < max) {
mySearchDao.delete(searchToDelete);
}
});
}
@ -95,20 +113,18 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
ourLog.debug("Searching for searches which are before {}", cutoff);
TransactionTemplate tt = new TransactionTemplate(myTransactionManager);
final Slice<Long> toDelete = tt.execute(new TransactionCallback<Slice<Long>>() {
@Override
public Slice<Long> doInTransaction(TransactionStatus theStatus) {
return mySearchDao.findWhereLastReturnedBefore(cutoff, new PageRequest(0, 1000));
}
});
final Slice<Long> toDelete = tt.execute(theStatus ->
mySearchDao.findWhereLastReturnedBefore(cutoff, new PageRequest(0, 1000))
);
for (final Long nextSearchToDelete : toDelete) {
ourLog.debug("Deleting search with PID {}", nextSearchToDelete);
tt.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
deleteSearch(nextSearchToDelete);
}
tt.execute(t->{
mySearchDao.updateDeleted(nextSearchToDelete, true);
return null;
});
tt.execute(t->{
deleteSearch(nextSearchToDelete);
return null;
});
}

View File

@ -442,7 +442,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
public void resourceCreated(RequestDetails theRequest, IBaseResource theResource) {
ResourceModifiedMessage msg = new ResourceModifiedMessage();
msg.setId(theResource.getIdElement());
msg.setOperationType(RestOperationTypeEnum.CREATE);
msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.CREATE);
msg.setNewPayload(myCtx, theResource);
submitResourceModified(msg);
}
@ -451,7 +451,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
public void resourceDeleted(RequestDetails theRequest, IBaseResource theResource) {
ResourceModifiedMessage msg = new ResourceModifiedMessage();
msg.setId(theResource.getIdElement());
msg.setOperationType(RestOperationTypeEnum.DELETE);
msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.DELETE);
submitResourceModified(msg);
}
@ -463,7 +463,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
void submitResourceModifiedForUpdate(IBaseResource theNewResource) {
ResourceModifiedMessage msg = new ResourceModifiedMessage();
msg.setId(theNewResource.getIdElement());
msg.setOperationType(RestOperationTypeEnum.UPDATE);
msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE);
msg.setNewPayload(myCtx, theNewResource);
submitResourceModified(msg);
}
@ -509,8 +509,10 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
@PostConstruct
public void start() {
for (IFhirResourceDao<?> next : myResourceDaos) {
if (myCtx.getResourceDefinition(next.getResourceType()).getName().equals("Subscription")) {
mySubscriptionDao = next;
if (next.getResourceType() != null) {
if (myCtx.getResourceDefinition(next.getResourceType()).getName().equals("Subscription")) {
mySubscriptionDao = next;
}
}
}
Validate.notNull(mySubscriptionDao);
@ -563,7 +565,10 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
});
}
protected void submitResourceModified(final ResourceModifiedMessage theMsg) {
/**
* This is an internal API - Use with caution!
*/
public void submitResourceModified(final ResourceModifiedMessage theMsg) {
mySubscriptionActivatingSubscriber.handleMessage(theMsg.getOperationType(), theMsg.getId(myCtx), theMsg.getNewPayload(myCtx));
sendToProcessingChannel(theMsg);
}

View File

@ -21,15 +21,12 @@ package ca.uhn.fhir.jpa.subscription;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import com.fasterxml.jackson.annotation.*;
import com.google.gson.Gson;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import java.io.Serializable;
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class ResourceDeliveryMessage {
@ -45,13 +42,13 @@ public class ResourceDeliveryMessage {
@JsonProperty("payloadId")
private String myPayloadId;
@JsonProperty("operationType")
private RestOperationTypeEnum myOperationType;
private ResourceModifiedMessage.OperationTypeEnum myOperationType;
public RestOperationTypeEnum getOperationType() {
public ResourceModifiedMessage.OperationTypeEnum getOperationType() {
return myOperationType;
}
public void setOperationType(RestOperationTypeEnum theOperationType) {
public void setOperationType(ResourceModifiedMessage.OperationTypeEnum theOperationType) {
myOperationType = theOperationType;
}

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -21,7 +21,6 @@ package ca.uhn.fhir.jpa.subscription;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
@ -38,12 +37,26 @@ public class ResourceModifiedMessage {
@JsonProperty("resourceId")
private String myId;
@JsonProperty("operationType")
private RestOperationTypeEnum myOperationType;
private OperationTypeEnum myOperationType;
/**
* This will only be set if the resource is being retriggered for a specific
* subscription
*/
@JsonProperty(value = "subscriptionId", required = false)
private String mySubscriptionId;
@JsonProperty("newPayload")
private String myNewPayloadEncoded;
@JsonIgnore
private transient IBaseResource myNewPayload;
public String getSubscriptionId() {
return mySubscriptionId;
}
public void setSubscriptionId(String theSubscriptionId) {
mySubscriptionId = theSubscriptionId;
}
public IIdType getId(FhirContext theCtx) {
IIdType retVal = null;
if (myId != null) {
@ -59,11 +72,11 @@ public class ResourceModifiedMessage {
return myNewPayload;
}
public RestOperationTypeEnum getOperationType() {
public OperationTypeEnum getOperationType() {
return myOperationType;
}
public void setOperationType(RestOperationTypeEnum theOperationType) {
public void setOperationType(OperationTypeEnum theOperationType) {
myOperationType = theOperationType;
}
@ -78,4 +91,14 @@ public class ResourceModifiedMessage {
myNewPayload = theNewPayload;
myNewPayloadEncoded = theCtx.newJsonParser().encodeResourceToString(theNewPayload);
}
public enum OperationTypeEnum {
CREATE,
UPDATE,
DELETE,
MANUALLY_RETRIGGERED;
}
}

View File

@ -22,7 +22,6 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.SubscriptionUtil;
import com.google.common.annotations.VisibleForTesting;
@ -162,7 +161,7 @@ public class SubscriptionActivatingSubscriber {
}
@SuppressWarnings("EnumSwitchStatementWhichMissesCases")
public void handleMessage(RestOperationTypeEnum theOperationType, IIdType theId, final IBaseResource theSubscription) throws MessagingException {
public void handleMessage(ResourceModifiedMessage.OperationTypeEnum theOperationType, IIdType theId, final IBaseResource theSubscription) throws MessagingException {
switch (theOperationType) {
case DELETE:

View File

@ -39,6 +39,8 @@ import org.springframework.messaging.MessagingException;
import java.util.List;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
private Logger ourLog = LoggerFactory.getLogger(SubscriptionCheckingSubscriber.class);
@ -59,7 +61,9 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
switch (msg.getOperationType()) {
case CREATE:
case UPDATE:
case MANUALLY_RETRIGGERED:
break;
case DELETE:
default:
ourLog.trace("Not processing modified message for {}", msg.getOperationType());
// ignore anything else
@ -79,6 +83,13 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
String nextSubscriptionId = nextSubscription.getIdElement(getContext()).toUnqualifiedVersionless().getValue();
String nextCriteriaString = nextSubscription.getCriteriaString();
if (isNotBlank(msg.getSubscriptionId())) {
if (!msg.getSubscriptionId().equals(nextSubscriptionId)) {
ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, msg.getSubscriptionId());
continue;
}
}
if (StringUtils.isBlank(nextCriteriaString)) {
continue;
}

View File

@ -174,4 +174,9 @@ public class JpaConstants {
* Operation name for the $document operation
*/
public static final String OPERATION_DOCUMENT = "$document";
/**
* Retrigger a subscription manually for a given resource
*/
public static final String OPERATION_RETRIGGER_SUBSCRIPTION = "$retrigger-subscription";
}

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.dstu3.BaseJpaDstu3Test;
import ca.uhn.fhir.jpa.dao.dstu3.SearchParamRegistryDstu3;
import ca.uhn.fhir.jpa.provider.SubscriptionRetriggeringProvider;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.subscription.email.SubscriptionEmailInterceptor;
@ -97,9 +98,11 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test {
ourRestServer.getFhirContext().setNarrativeGenerator(new DefaultThymeleafNarrativeGenerator());
myTerminologyUploaderProvider = myAppCtx.getBean(TerminologyUploaderProviderDstu3.class);
ourRestServer.setPlainProviders(mySystemProvider, myTerminologyUploaderProvider);
SubscriptionRetriggeringProvider subscriptionRetriggeringProvider = myAppCtx.getBean(SubscriptionRetriggeringProvider.class);
ourRestServer.registerProvider(subscriptionRetriggeringProvider);
JpaConformanceProviderDstu3 confProvider = new JpaConformanceProviderDstu3(ourRestServer, mySystemDao, myDaoConfig);
confProvider.setImplementationDescription("THIS IS THE DESC");
ourRestServer.setServerConformanceProvider(confProvider);

View File

@ -0,0 +1,250 @@
package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.provider.SubscriptionRetriggeringProvider;
import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test;
import ca.uhn.fhir.jpa.util.JpaConstants;
import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor;
import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.util.PortUtil;
import com.google.common.collect.Lists;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.hl7.fhir.dstu3.model.*;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.*;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
/**
* Test the rest-hook subscriptions
*/
@SuppressWarnings("Duplicates")
public class RetriggeringDstu3Test extends BaseResourceProviderDstu3Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(RetriggeringDstu3Test.class);
private static List<Observation> ourCreatedObservations = Lists.newArrayList();
private static int ourListenerPort;
private static RestfulServer ourListenerRestServer;
private static Server ourListenerServer;
private static String ourListenerServerBase;
private static List<Observation> ourUpdatedObservations = Lists.newArrayList();
private static List<String> ourContentTypes = new ArrayList<>();
private List<IIdType> mySubscriptionIds = new ArrayList<>();
@After
public void afterUnregisterRestHookListener() {
ourLog.info("**** Starting @After *****");
for (IIdType next : mySubscriptionIds) {
ourClient.delete().resourceById(next).execute();
}
mySubscriptionIds.clear();
myDaoConfig.setAllowMultipleDelete(true);
ourLog.info("Deleting all subscriptions");
ourClient.delete().resourceConditionalByUrl("Subscription?status=active").execute();
ourClient.delete().resourceConditionalByUrl("Observation?code:missing=false").execute();
ourLog.info("Done deleting all subscriptions");
myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete());
ourRestServer.unregisterInterceptor(ourRestHookSubscriptionInterceptor);
}
@Before
public void beforeRegisterRestHookListener() {
ourRestServer.registerInterceptor(ourRestHookSubscriptionInterceptor);
}
@Before
public void beforeReset() {
ourCreatedObservations.clear();
ourUpdatedObservations.clear();
ourContentTypes.clear();
}
private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {
Subscription subscription = new Subscription();
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
subscription.setCriteria(theCriteria);
Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent();
channel.setType(Subscription.SubscriptionChannelType.RESTHOOK);
channel.setPayload(thePayload);
channel.setEndpoint(theEndpoint);
subscription.setChannel(channel);
MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute();
subscription.setId(methodOutcome.getId().getIdPart());
mySubscriptionIds.add(methodOutcome.getId());
waitForQueueToDrain();
return subscription;
}
private Observation sendObservation(String code, String system) {
Observation observation = new Observation();
CodeableConcept codeableConcept = new CodeableConcept();
observation.setCode(codeableConcept);
Coding coding = codeableConcept.addCoding();
coding.setCode(code);
coding.setSystem(system);
observation.setStatus(Observation.ObservationStatus.FINAL);
MethodOutcome methodOutcome = ourClient.create().resource(observation).execute();
String observationId = methodOutcome.getId().getIdPart();
observation.setId(observationId);
return observation;
}
@Test
public void testRetriggerResourceToSpecificSubscription() throws Exception {
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
IdType subscriptionId = createSubscription(criteria1, payload, ourListenerServerBase).getIdElement().withResourceType("Subscription");
createSubscription(criteria2, payload, ourListenerServerBase).getIdElement();
IdType obsId = sendObservation(code, "SNOMED-CT").getIdElement().withResourceType("Observation");
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
ourClient.registerInterceptor(new LoggingInterceptor(true));
Parameters response = ourClient
.operation()
.onInstance(subscriptionId)
.named(JpaConstants.OPERATION_RETRIGGER_SUBSCRIPTION)
.withParameter(Parameters.class, SubscriptionRetriggeringProvider.RESOURCE_ID, new UriType(obsId.toUnqualifiedVersionless().getValue()))
.execute();
OperationOutcome oo = (OperationOutcome) response.getParameter().get(0).getResource();
assertEquals("Triggered resource " + obsId.getValue() + " for subscription", oo.getIssue().get(0).getDiagnostics());
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(2, ourUpdatedObservations);
}
@Test
public void testRetriggerResourceToSpecificSubscriptionWhichDoesntMatch() throws Exception {
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
createSubscription(criteria1, payload, ourListenerServerBase).getIdElement().withResourceType("Subscription");
IdType subscriptionId = createSubscription(criteria2, payload, ourListenerServerBase).getIdElement().withResourceType("Subscription");
IdType obsId = sendObservation(code, "SNOMED-CT").getIdElement().withResourceType("Observation");
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
ourClient.registerInterceptor(new LoggingInterceptor(true));
Parameters response = ourClient
.operation()
.onInstance(subscriptionId)
.named(JpaConstants.OPERATION_RETRIGGER_SUBSCRIPTION)
.withParameter(Parameters.class, SubscriptionRetriggeringProvider.RESOURCE_ID, new UriType(obsId.toUnqualifiedVersionless().getValue()))
.execute();
OperationOutcome oo = (OperationOutcome) response.getParameter().get(0).getResource();
assertEquals("Triggered resource " + obsId.getValue() + " for subscription", oo.getIssue().get(0).getDiagnostics());
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
}
private void waitForQueueToDrain() throws InterruptedException {
RestHookTestDstu2Test.waitForQueueToDrain(ourRestHookSubscriptionInterceptor);
}
public static class ObservationListener implements IResourceProvider {
@Create
public MethodOutcome create(@ResourceParam Observation theObservation, HttpServletRequest theRequest) {
ourLog.info("Received Listener Create");
ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
ourCreatedObservations.add(theObservation);
return new MethodOutcome(new IdType("Observation/1"), true);
}
@Override
public Class<? extends IBaseResource> getResourceType() {
return Observation.class;
}
@Update
public MethodOutcome update(@ResourceParam Observation theObservation, HttpServletRequest theRequest) {
ourUpdatedObservations.add(theObservation);
ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
ourLog.info("Received Listener Update (now have {} updates)", ourUpdatedObservations.size());
return new MethodOutcome(new IdType("Observation/1"), false);
}
}
@BeforeClass
public static void startListenerServer() throws Exception {
ourListenerPort = PortUtil.findFreePort();
ourListenerRestServer = new RestfulServer(FhirContext.forDstu3());
ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context";
ObservationListener obsListener = new ObservationListener();
ourListenerRestServer.setResourceProviders(obsListener);
ourListenerServer = new Server(ourListenerPort);
ServletContextHandler proxyHandler = new ServletContextHandler();
proxyHandler.setContextPath("/");
ServletHolder servletHolder = new ServletHolder();
servletHolder.setServlet(ourListenerRestServer);
proxyHandler.addServlet(servletHolder, "/fhir/context/*");
ourListenerServer.setHandler(proxyHandler);
ourListenerServer.start();
}
@AfterClass
public static void stopListenerServer() throws Exception {
ourListenerServer.stop();
}
}

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.rest.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -17,6 +18,7 @@ import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.*;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicStatusLine;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.StringType;
import org.junit.*;
@ -124,6 +126,22 @@ public class OperationClientR4Test {
assertEquals("http://foo/$nonrepeating?valstr=str&valtok=sys2%7Cval2", value.getURI().toASCIIString());
}
@Test
public void testOperationOnInstanceWithIncompleteInstanceId() throws Exception {
try {
ourGenClient
.operation()
.onInstance(new IdType("123"))
.named("nonrepeating")
.withSearchParameter(Parameters.class, "valstr", new StringParam("str"))
.execute();
fail();
} catch (IllegalArgumentException e) {
assertEquals("Can not invoke operation \"$nonrepeating\" on instance \"123\" - No resource type specified", e.getMessage());
}
}
@Test
public void testNonRepeatingUsingParameters() throws Exception {
Parameters response = ourAnnClient.nonrepeating(new StringParam("str"), new TokenParam("sys", "val"));

View File

@ -22,6 +22,20 @@
some database drivers did not automatically register and had to be manually added to
the classpath.
</action>
<action type="add">
The module which deletes stale searches has been modified so that it deletes very large
searches (searches with 10000+ results in the query cache) in smaller batches, in order
to avoid having very long running delete operations running.
</action>
<action type="fix">
When invoking an operation using the fluent client on an instance, the operation would
accidentally invoke against the server if the provided ID did not include a type. This
has been corrected so that an IllegalArgumentException is now thrown.
</action>
<action type="add">
A new operation has been added to the JPA server called $retrigger-subscription. This can
be used to cause a transaction to redeliver a resource that previously triggered.
</actiom>
</release>
<release version="3.5.0" date="2018-09-17">

View File

@ -75,7 +75,7 @@
<p>
This release is happening a little bit later than we had hoped. This release features
a complete reworking of the way that search indexes in the JPA server work, as well
as a new database migration tool that can be used to miograte to a new version of
as a new database migration tool that can be used to migrate to a new version of
HAPI FHIR. Testing these features ended up taking longer than we had hoped, but
we think it will be worth the wait.
</p>