From 002c4b3ff7ec48af5b4db89aceaa53b9e8209fca Mon Sep 17 00:00:00 2001 From: James Agnew Date: Thu, 27 Sep 2018 20:03:01 -0400 Subject: [PATCH] Add retrigger subscription operation --- .../uhn/fhir/jpa/demo/FhirServerConfig.java | 1 + .../java/ca/uhn/fhir/cli/BaseCommandTest.java | 2 +- .../fhir/rest/client/impl/GenericClient.java | 2 + .../ca/uhn/fhir/jpa/config/BaseConfig.java | 11 +- .../ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java | 1 + .../ca/uhn/fhir/jpa/dao/data/ISearchDao.java | 27 +- .../fhir/jpa/dao/data/ISearchResultDao.java | 28 +- .../java/ca/uhn/fhir/jpa/entity/Search.java | 34 +-- .../SubscriptionRetriggeringProvider.java | 69 +++++ .../jpa/search/SearchCoordinatorSvcImpl.java | 1 + .../search/StaleSearchDeletingSvcImpl.java | 44 ++- .../BaseSubscriptionInterceptor.java | 17 +- .../subscription/ResourceDeliveryMessage.java | 9 +- .../subscription/ResourceModifiedMessage.java | 35 ++- .../SubscriptionActivatingSubscriber.java | 3 +- .../SubscriptionCheckingSubscriber.java | 11 + .../ca/uhn/fhir/jpa/util/JpaConstants.java | 5 + .../dstu3/BaseResourceProviderDstu3Test.java | 5 +- .../subscription/RetriggeringDstu3Test.java | 250 ++++++++++++++++++ .../rest/client/OperationClientR4Test.java | 18 ++ src/changes/changes.xml | 14 + src/site/xdoc/index.xml | 2 +- 22 files changed, 498 insertions(+), 91 deletions(-) create mode 100644 hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionRetriggeringProvider.java create mode 100644 hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/RetriggeringDstu3Test.java diff --git a/example-projects/hapi-fhir-jpaserver-example-postgres/src/main/java/ca/uhn/fhir/jpa/demo/FhirServerConfig.java b/example-projects/hapi-fhir-jpaserver-example-postgres/src/main/java/ca/uhn/fhir/jpa/demo/FhirServerConfig.java index 0d29294f06c..c2f14354b5c 100644 --- a/example-projects/hapi-fhir-jpaserver-example-postgres/src/main/java/ca/uhn/fhir/jpa/demo/FhirServerConfig.java +++ b/example-projects/hapi-fhir-jpaserver-example-postgres/src/main/java/ca/uhn/fhir/jpa/demo/FhirServerConfig.java @@ -63,6 +63,7 @@ public class FhirServerConfig extends BaseJavaConfigDstu3 { return retVal; } + @Override @Bean() public LocalContainerEntityManagerFactoryBean entityManagerFactory() { LocalContainerEntityManagerFactoryBean retVal = new LocalContainerEntityManagerFactoryBean(); diff --git a/hapi-fhir-cli/hapi-fhir-cli-api/src/test/java/ca/uhn/fhir/cli/BaseCommandTest.java b/hapi-fhir-cli/hapi-fhir-cli-api/src/test/java/ca/uhn/fhir/cli/BaseCommandTest.java index 7329fb90d3c..8a1b992a8d9 100644 --- a/hapi-fhir-cli/hapi-fhir-cli-api/src/test/java/ca/uhn/fhir/cli/BaseCommandTest.java +++ b/hapi-fhir-cli/hapi-fhir-cli-api/src/test/java/ca/uhn/fhir/cli/BaseCommandTest.java @@ -17,7 +17,7 @@ public class BaseCommandTest { InputStream stdin = System.in; try { - System.setIn(new ByteArrayInputStream("A VALUE".getBytes())); + System.setIn(new ByteArrayInputStream("A VALUE\n".getBytes())); String value = new MyBaseCommand().read(); assertEquals("A VALUE", value); diff --git a/hapi-fhir-client/src/main/java/ca/uhn/fhir/rest/client/impl/GenericClient.java b/hapi-fhir-client/src/main/java/ca/uhn/fhir/rest/client/impl/GenericClient.java index 1ec7a8b8af9..48252be8797 100644 --- a/hapi-fhir-client/src/main/java/ca/uhn/fhir/rest/client/impl/GenericClient.java +++ b/hapi-fhir-client/src/main/java/ca/uhn/fhir/rest/client/impl/GenericClient.java @@ -56,6 +56,7 @@ import java.io.Reader; import java.util.*; import java.util.Map.Entry; +import static org.apache.commons.lang3.StringUtils.defaultString; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -1152,6 +1153,7 @@ public class GenericClient extends BaseClient implements IGenericClient { version = null; } else if (myId != null) { resourceName = myId.getResourceType(); + Validate.notBlank(defaultString(resourceName), "Can not invoke operation \"$%s\" on instance \"%s\" - No resource type specified", myOperationName, myId.getValue()); id = myId.getIdPart(); version = myId.getVersionIdPart(); } else { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java index 2abb8c59d09..62fcbce0374 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.config; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.config; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.i18n.HapiLocalizer; +import ca.uhn.fhir.jpa.provider.SubscriptionRetriggeringProvider; import ca.uhn.fhir.jpa.search.*; import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc; import ca.uhn.fhir.jpa.sp.SearchParamPresenceSvcImpl; @@ -108,6 +109,12 @@ public abstract class BaseConfig implements SchedulingConfigurer { return b.getObject(); } + @Bean + @Lazy + public SubscriptionRetriggeringProvider mySubscriptionRetriggeringProvider() { + return new SubscriptionRetriggeringProvider(); + } + @Bean(autowire = Autowire.BY_TYPE) public ISearchCoordinatorSvc searchCoordinatorSvc() { return new SearchCoordinatorSvcImpl(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java index cf26d4f1f2f..49e579a1c29 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirDao.java @@ -1079,6 +1079,7 @@ public abstract class BaseHapiFhirDao implements IDao, String resourceName = defaultIfBlank(theResourceName, null); Search search = new Search(); + search.setDeleted(false); search.setCreated(new Date()); search.setSearchLastReturned(new Date()); search.setLastUpdated(theSince, theUntil); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/ISearchDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/ISearchDao.java index 5414d0fcee4..da5ea870cf2 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/ISearchDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/ISearchDao.java @@ -1,11 +1,16 @@ package ca.uhn.fhir.jpa.dao.data; +import ca.uhn.fhir.jpa.entity.Search; +import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Slice; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + import java.util.Collection; import java.util.Date; -import org.springframework.data.domain.Pageable; -import org.springframework.data.domain.Slice; - /* * #%L * HAPI FHIR JPA Server @@ -15,9 +20,9 @@ import org.springframework.data.domain.Slice; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,13 +31,6 @@ import org.springframework.data.domain.Slice; * #L% */ -import org.springframework.data.jpa.repository.JpaRepository; -import org.springframework.data.jpa.repository.Modifying; -import org.springframework.data.jpa.repository.Query; -import org.springframework.data.repository.query.Param; - -import ca.uhn.fhir.jpa.entity.Search; - public interface ISearchDao extends JpaRepository { @Query("SELECT s FROM Search s WHERE s.myUuid = :uuid") @@ -44,11 +42,14 @@ public interface ISearchDao extends JpaRepository { // @Query("SELECT s FROM Search s WHERE s.myCreated < :cutoff") // public Collection findWhereCreatedBefore(@Param("cutoff") Date theCutoff); - @Query("SELECT s FROM Search s WHERE s.myResourceType = :type AND mySearchQueryStringHash = :hash AND s.myCreated > :cutoff") + @Query("SELECT s FROM Search s WHERE s.myResourceType = :type AND mySearchQueryStringHash = :hash AND s.myCreated > :cutoff AND s.myDeleted = false") Collection find(@Param("type") String theResourceType, @Param("hash") int theHashCode, @Param("cutoff") Date theCreatedCutoff); @Modifying @Query("UPDATE Search s SET s.mySearchLastReturned = :last WHERE s.myId = :pid") void updateSearchLastReturned(@Param("pid") long thePid, @Param("last") Date theDate); + @Modifying + @Query("UPDATE Search s SET s.myDeleted = :deleted WHERE s.myId = :pid") + void updateDeleted(@Param("pid") Long thePid, @Param("deleted") boolean theDeleted); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/ISearchResultDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/ISearchResultDao.java index 4f7e05d9131..9206b89e400 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/ISearchResultDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/ISearchResultDao.java @@ -1,9 +1,13 @@ package ca.uhn.fhir.jpa.dao.data; -import java.util.Collection; - +import ca.uhn.fhir.jpa.entity.Search; +import ca.uhn.fhir.jpa.entity.SearchResult; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Slice; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; /* * #%L @@ -14,9 +18,9 @@ import org.springframework.data.domain.Pageable; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -25,23 +29,11 @@ import org.springframework.data.domain.Pageable; * #L% */ -import org.springframework.data.jpa.repository.JpaRepository; -import org.springframework.data.jpa.repository.Modifying; -import org.springframework.data.jpa.repository.Query; -import org.springframework.data.repository.query.Param; - -import ca.uhn.fhir.jpa.entity.Search; -import ca.uhn.fhir.jpa.entity.SearchResult; - public interface ISearchResultDao extends JpaRepository { - @Query(value="SELECT r FROM SearchResult r WHERE r.mySearch = :search") - Collection findWithSearchUuid(@Param("search") Search theSearch); - @Query(value="SELECT r.myResourcePid FROM SearchResult r WHERE r.mySearch = :search ORDER BY r.myOrder ASC") Page findWithSearchUuid(@Param("search") Search theSearch, Pageable thePage); - @Modifying - @Query(value="DELETE FROM SearchResult r WHERE r.mySearchPid = :search") - void deleteForSearch(@Param("search") Long theSearchPid); + @Query(value="SELECT r.myId FROM SearchResult r WHERE r.mySearchPid = :search") + Slice findForSearch(Pageable thePage, @Param("search") Long theSearchPid); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Search.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Search.java index d3ed16c910b..a10e2e73560 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Search.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Search.java @@ -19,9 +19,9 @@ import static org.apache.commons.lang3.StringUtils.left; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -39,77 +39,61 @@ import static org.apache.commons.lang3.StringUtils.left; }) public class Search implements Serializable { - private static final int MAX_SEARCH_QUERY_STRING = 10000; @SuppressWarnings("WeakerAccess") public static final int UUID_COLUMN_LENGTH = 36; + private static final int MAX_SEARCH_QUERY_STRING = 10000; private static final int FAILURE_MESSAGE_LENGTH = 500; private static final long serialVersionUID = 1L; @Temporal(TemporalType.TIMESTAMP) @Column(name = "CREATED", nullable = false, updatable = false) private Date myCreated; - + @Column(name = "SEARCH_DELETED", nullable = true) + private Boolean myDeleted; @Column(name = "FAILURE_CODE", nullable = true) private Integer myFailureCode; - @Column(name = "FAILURE_MESSAGE", length = FAILURE_MESSAGE_LENGTH, nullable = true) private String myFailureMessage; - @Id @GeneratedValue(strategy = GenerationType.AUTO, generator = "SEQ_SEARCH") @SequenceGenerator(name = "SEQ_SEARCH", sequenceName = "SEQ_SEARCH") @Column(name = "PID") private Long myId; - @OneToMany(mappedBy = "mySearch") private Collection myIncludes; - @Temporal(TemporalType.TIMESTAMP) @Column(name = "LAST_UPDATED_HIGH", nullable = true, insertable = true, updatable = false) private Date myLastUpdatedHigh; - @Temporal(TemporalType.TIMESTAMP) @Column(name = "LAST_UPDATED_LOW", nullable = true, insertable = true, updatable = false) private Date myLastUpdatedLow; - @Column(name = "NUM_FOUND", nullable = false) private int myNumFound; - @Column(name = "PREFERRED_PAGE_SIZE", nullable = true) private Integer myPreferredPageSize; - @Column(name = "RESOURCE_ID", nullable = true) private Long myResourceId; - @Column(name = "RESOURCE_TYPE", length = 200, nullable = true) private String myResourceType; - @OneToMany(mappedBy = "mySearch") private Collection myResults; - @NotNull @Temporal(TemporalType.TIMESTAMP) @Column(name = "SEARCH_LAST_RETURNED", nullable = false, updatable = false) private Date mySearchLastReturned; - @Lob() @Basic(fetch = FetchType.LAZY) @Column(name = "SEARCH_QUERY_STRING", nullable = true, updatable = false, length = MAX_SEARCH_QUERY_STRING) private String mySearchQueryString; - @Column(name = "SEARCH_QUERY_STRING_HASH", nullable = true, updatable = false) private Integer mySearchQueryStringHash; - @Enumerated(EnumType.ORDINAL) @Column(name = "SEARCH_TYPE", nullable = false) private SearchTypeEnum mySearchType; - @Enumerated(EnumType.STRING) @Column(name = "SEARCH_STATUS", nullable = false, length = 10) private SearchStatusEnum myStatus; - @Column(name = "TOTAL_COUNT", nullable = true) private Integer myTotalCount; - @Column(name = "SEARCH_UUID", length = UUID_COLUMN_LENGTH, nullable = false, updatable = false) private String myUuid; @@ -120,6 +104,14 @@ public class Search implements Serializable { super(); } + public Boolean getDeleted() { + return myDeleted; + } + + public void setDeleted(Boolean theDeleted) { + myDeleted = theDeleted; + } + public Date getCreated() { return myCreated; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionRetriggeringProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionRetriggeringProvider.java new file mode 100644 index 00000000000..0307dbb527c --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionRetriggeringProvider.java @@ -0,0 +1,69 @@ +package ca.uhn.fhir.jpa.provider; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor; +import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessage; +import ca.uhn.fhir.jpa.util.JpaConstants; +import ca.uhn.fhir.rest.annotation.IdParam; +import ca.uhn.fhir.rest.annotation.Operation; +import ca.uhn.fhir.rest.annotation.OperationParam; +import ca.uhn.fhir.rest.annotation.RequiredParam; +import ca.uhn.fhir.rest.param.UriParam; +import ca.uhn.fhir.rest.server.IResourceProvider; +import ca.uhn.fhir.util.OperationOutcomeUtil; +import ca.uhn.fhir.util.ValidateUtil; +import org.hl7.fhir.dstu3.model.UriType; +import org.hl7.fhir.instance.model.IdType; +import org.hl7.fhir.instance.model.api.IBaseOperationOutcome; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.List; + +public class SubscriptionRetriggeringProvider implements IResourceProvider { + + public static final String RESOURCE_ID = "resourceId"; + @Autowired + private FhirContext myFhirContext; + @Autowired + private IFhirSystemDao mySystemDao; + @Autowired(required = false) + private List> mySubscriptionInterceptorList; + + @Operation(name= JpaConstants.OPERATION_RETRIGGER_SUBSCRIPTION) + public IBaseOperationOutcome reTriggerSubscription( + @IdParam IIdType theSubscriptionId, + @OperationParam(name= RESOURCE_ID) UriParam theResourceId) { + + ValidateUtil.isTrueOrThrowInvalidRequest(theResourceId != null, RESOURCE_ID + " parameter not provided"); + IdType resourceId = new IdType(theResourceId.getValue()); + ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type"); + ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part"); + + Class resourceType = myFhirContext.getResourceDefinition(resourceId.getResourceType()).getImplementingClass(); + IFhirResourceDao dao = mySystemDao.getDao(resourceType); + IBaseResource resourceToRetrigger = dao.read(resourceId); + + ResourceModifiedMessage msg = new ResourceModifiedMessage(); + msg.setId(resourceToRetrigger.getIdElement()); + msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE); + msg.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue()); + msg.setNewPayload(myFhirContext, resourceToRetrigger); + + for (BaseSubscriptionInterceptor next :mySubscriptionInterceptorList) { + next.submitResourceModified(msg); + } + + IBaseOperationOutcome retVal = OperationOutcomeUtil.newInstance(myFhirContext); + OperationOutcomeUtil.addIssue(myFhirContext, retVal, "information", "Triggered resource " + theResourceId.getValue() + " for subscription", null, null); + return retVal; + } + + @Override + public Class getResourceType() { + return myFhirContext.getResourceDefinition("Subscription").getImplementingClass(); + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java index 4fbd1b179a0..eb4b936e6e7 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java @@ -321,6 +321,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { } Search search = new Search(); + search.setDeleted(false); search.setUuid(searchUuid); search.setCreated(new Date()); search.setSearchLastReturned(new Date()); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/StaleSearchDeletingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/StaleSearchDeletingSvcImpl.java index a9a01664b34..a91711c9b83 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/StaleSearchDeletingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/StaleSearchDeletingSvcImpl.java @@ -29,6 +29,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.time.DateUtils; import org.hl7.fhir.dstu3.model.InstantType; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Slice; import org.springframework.scheduling.annotation.Scheduled; @@ -70,8 +71,25 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc { mySearchDao.findById(theSearchPid).ifPresent(searchToDelete -> { ourLog.info("Deleting search {}/{} - Created[{}] -- Last returned[{}]", searchToDelete.getId(), searchToDelete.getUuid(), new InstantType(searchToDelete.getCreated()), new InstantType(searchToDelete.getSearchLastReturned())); mySearchIncludeDao.deleteForSearch(searchToDelete.getId()); - mySearchResultDao.deleteForSearch(searchToDelete.getId()); - mySearchDao.delete(searchToDelete); + + /* + * Note, we're only deleting up to 1000 results in an individual search here. This + * is to prevent really long running transactions in cases where there are + * huge searches with tons of results in them. By the time we've gotten here + * we have marked the parent Search entity as deleted, so it's not such a + * huge deal to be only partially deleting search results. They'll get deleted + * eventually + */ + int max = 10000; + Slice resultPids = mySearchResultDao.findForSearch(PageRequest.of(0, max), searchToDelete.getId()); + for (Long next : resultPids) { + mySearchResultDao.deleteById(next); + } + + // Only delete if we don't have results left in this search + if (resultPids.getNumberOfElements() < max) { + mySearchDao.delete(searchToDelete); + } }); } @@ -95,20 +113,18 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc { ourLog.debug("Searching for searches which are before {}", cutoff); TransactionTemplate tt = new TransactionTemplate(myTransactionManager); - final Slice toDelete = tt.execute(new TransactionCallback>() { - @Override - public Slice doInTransaction(TransactionStatus theStatus) { - return mySearchDao.findWhereLastReturnedBefore(cutoff, new PageRequest(0, 1000)); - } - }); - + final Slice toDelete = tt.execute(theStatus -> + mySearchDao.findWhereLastReturnedBefore(cutoff, new PageRequest(0, 1000)) + ); for (final Long nextSearchToDelete : toDelete) { ourLog.debug("Deleting search with PID {}", nextSearchToDelete); - tt.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(TransactionStatus status) { - deleteSearch(nextSearchToDelete); - } + tt.execute(t->{ + mySearchDao.updateDeleted(nextSearchToDelete, true); + return null; + }); + tt.execute(t->{ + deleteSearch(nextSearchToDelete); + return null; }); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java index c32d53a2979..2d060c56575 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java @@ -442,7 +442,7 @@ public abstract class BaseSubscriptionInterceptor exten public void resourceCreated(RequestDetails theRequest, IBaseResource theResource) { ResourceModifiedMessage msg = new ResourceModifiedMessage(); msg.setId(theResource.getIdElement()); - msg.setOperationType(RestOperationTypeEnum.CREATE); + msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.CREATE); msg.setNewPayload(myCtx, theResource); submitResourceModified(msg); } @@ -451,7 +451,7 @@ public abstract class BaseSubscriptionInterceptor exten public void resourceDeleted(RequestDetails theRequest, IBaseResource theResource) { ResourceModifiedMessage msg = new ResourceModifiedMessage(); msg.setId(theResource.getIdElement()); - msg.setOperationType(RestOperationTypeEnum.DELETE); + msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.DELETE); submitResourceModified(msg); } @@ -463,7 +463,7 @@ public abstract class BaseSubscriptionInterceptor exten void submitResourceModifiedForUpdate(IBaseResource theNewResource) { ResourceModifiedMessage msg = new ResourceModifiedMessage(); msg.setId(theNewResource.getIdElement()); - msg.setOperationType(RestOperationTypeEnum.UPDATE); + msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE); msg.setNewPayload(myCtx, theNewResource); submitResourceModified(msg); } @@ -509,8 +509,10 @@ public abstract class BaseSubscriptionInterceptor exten @PostConstruct public void start() { for (IFhirResourceDao next : myResourceDaos) { - if (myCtx.getResourceDefinition(next.getResourceType()).getName().equals("Subscription")) { - mySubscriptionDao = next; + if (next.getResourceType() != null) { + if (myCtx.getResourceDefinition(next.getResourceType()).getName().equals("Subscription")) { + mySubscriptionDao = next; + } } } Validate.notNull(mySubscriptionDao); @@ -563,7 +565,10 @@ public abstract class BaseSubscriptionInterceptor exten }); } - protected void submitResourceModified(final ResourceModifiedMessage theMsg) { + /** + * This is an internal API - Use with caution! + */ + public void submitResourceModified(final ResourceModifiedMessage theMsg) { mySubscriptionActivatingSubscriber.handleMessage(theMsg.getOperationType(), theMsg.getId(myCtx), theMsg.getNewPayload(myCtx)); sendToProcessingChannel(theMsg); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java index 5034c26ed9a..f7f7c8d374c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceDeliveryMessage.java @@ -21,15 +21,12 @@ package ca.uhn.fhir.jpa.subscription; */ import ca.uhn.fhir.context.FhirContext; -import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import com.fasterxml.jackson.annotation.*; import com.google.gson.Gson; import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; -import java.io.Serializable; - @JsonInclude(JsonInclude.Include.NON_NULL) @JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) public class ResourceDeliveryMessage { @@ -45,13 +42,13 @@ public class ResourceDeliveryMessage { @JsonProperty("payloadId") private String myPayloadId; @JsonProperty("operationType") - private RestOperationTypeEnum myOperationType; + private ResourceModifiedMessage.OperationTypeEnum myOperationType; - public RestOperationTypeEnum getOperationType() { + public ResourceModifiedMessage.OperationTypeEnum getOperationType() { return myOperationType; } - public void setOperationType(RestOperationTypeEnum theOperationType) { + public void setOperationType(ResourceModifiedMessage.OperationTypeEnum theOperationType) { myOperationType = theOperationType; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java index 9b5ad2ebac7..e2ecc50c35c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessage.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,7 +21,6 @@ package ca.uhn.fhir.jpa.subscription; */ import ca.uhn.fhir.context.FhirContext; -import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; @@ -38,12 +37,26 @@ public class ResourceModifiedMessage { @JsonProperty("resourceId") private String myId; @JsonProperty("operationType") - private RestOperationTypeEnum myOperationType; + private OperationTypeEnum myOperationType; + /** + * This will only be set if the resource is being retriggered for a specific + * subscription + */ + @JsonProperty(value = "subscriptionId", required = false) + private String mySubscriptionId; @JsonProperty("newPayload") private String myNewPayloadEncoded; @JsonIgnore private transient IBaseResource myNewPayload; + public String getSubscriptionId() { + return mySubscriptionId; + } + + public void setSubscriptionId(String theSubscriptionId) { + mySubscriptionId = theSubscriptionId; + } + public IIdType getId(FhirContext theCtx) { IIdType retVal = null; if (myId != null) { @@ -59,11 +72,11 @@ public class ResourceModifiedMessage { return myNewPayload; } - public RestOperationTypeEnum getOperationType() { + public OperationTypeEnum getOperationType() { return myOperationType; } - public void setOperationType(RestOperationTypeEnum theOperationType) { + public void setOperationType(OperationTypeEnum theOperationType) { myOperationType = theOperationType; } @@ -78,4 +91,14 @@ public class ResourceModifiedMessage { myNewPayload = theNewPayload; myNewPayloadEncoded = theCtx.newJsonParser().encodeResourceToString(theNewPayload); } + + + public enum OperationTypeEnum { + CREATE, + UPDATE, + DELETE, + MANUALLY_RETRIGGERED; + + } + } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java index 7e78222f59e..7b80f1dfef8 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java @@ -22,7 +22,6 @@ package ca.uhn.fhir.jpa.subscription; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.dao.IFhirResourceDao; -import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.util.SubscriptionUtil; import com.google.common.annotations.VisibleForTesting; @@ -162,7 +161,7 @@ public class SubscriptionActivatingSubscriber { } @SuppressWarnings("EnumSwitchStatementWhichMissesCases") - public void handleMessage(RestOperationTypeEnum theOperationType, IIdType theId, final IBaseResource theSubscription) throws MessagingException { + public void handleMessage(ResourceModifiedMessage.OperationTypeEnum theOperationType, IIdType theId, final IBaseResource theSubscription) throws MessagingException { switch (theOperationType) { case DELETE: diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java index 16184c069cb..b4a43062f64 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionCheckingSubscriber.java @@ -39,6 +39,8 @@ import org.springframework.messaging.MessagingException; import java.util.List; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber { private Logger ourLog = LoggerFactory.getLogger(SubscriptionCheckingSubscriber.class); @@ -59,7 +61,9 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber { switch (msg.getOperationType()) { case CREATE: case UPDATE: + case MANUALLY_RETRIGGERED: break; + case DELETE: default: ourLog.trace("Not processing modified message for {}", msg.getOperationType()); // ignore anything else @@ -79,6 +83,13 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber { String nextSubscriptionId = nextSubscription.getIdElement(getContext()).toUnqualifiedVersionless().getValue(); String nextCriteriaString = nextSubscription.getCriteriaString(); + if (isNotBlank(msg.getSubscriptionId())) { + if (!msg.getSubscriptionId().equals(nextSubscriptionId)) { + ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, msg.getSubscriptionId()); + continue; + } + } + if (StringUtils.isBlank(nextCriteriaString)) { continue; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/JpaConstants.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/JpaConstants.java index 420e318ea65..573af494f63 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/JpaConstants.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/JpaConstants.java @@ -174,4 +174,9 @@ public class JpaConstants { * Operation name for the $document operation */ public static final String OPERATION_DOCUMENT = "$document"; + + /** + * Retrigger a subscription manually for a given resource + */ + public static final String OPERATION_RETRIGGER_SUBSCRIPTION = "$retrigger-subscription"; } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java index 607b8ddbfe2..c0eb8435436 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java @@ -4,6 +4,7 @@ import ca.uhn.fhir.jpa.config.WebsocketDispatcherConfig; import ca.uhn.fhir.jpa.dao.data.ISearchDao; import ca.uhn.fhir.jpa.dao.dstu3.BaseJpaDstu3Test; import ca.uhn.fhir.jpa.dao.dstu3.SearchParamRegistryDstu3; +import ca.uhn.fhir.jpa.provider.SubscriptionRetriggeringProvider; import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider; import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc; import ca.uhn.fhir.jpa.subscription.email.SubscriptionEmailInterceptor; @@ -97,9 +98,11 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test { ourRestServer.getFhirContext().setNarrativeGenerator(new DefaultThymeleafNarrativeGenerator()); myTerminologyUploaderProvider = myAppCtx.getBean(TerminologyUploaderProviderDstu3.class); - ourRestServer.setPlainProviders(mySystemProvider, myTerminologyUploaderProvider); + SubscriptionRetriggeringProvider subscriptionRetriggeringProvider = myAppCtx.getBean(SubscriptionRetriggeringProvider.class); + ourRestServer.registerProvider(subscriptionRetriggeringProvider); + JpaConformanceProviderDstu3 confProvider = new JpaConformanceProviderDstu3(ourRestServer, mySystemDao, myDaoConfig); confProvider.setImplementationDescription("THIS IS THE DESC"); ourRestServer.setServerConformanceProvider(confProvider); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/RetriggeringDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/RetriggeringDstu3Test.java new file mode 100644 index 00000000000..f2eb11e0653 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/RetriggeringDstu3Test.java @@ -0,0 +1,250 @@ +package ca.uhn.fhir.jpa.subscription; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.dao.DaoConfig; +import ca.uhn.fhir.jpa.provider.SubscriptionRetriggeringProvider; +import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test; +import ca.uhn.fhir.jpa.util.JpaConstants; +import ca.uhn.fhir.rest.annotation.Create; +import ca.uhn.fhir.rest.annotation.ResourceParam; +import ca.uhn.fhir.rest.annotation.Update; +import ca.uhn.fhir.rest.api.Constants; +import ca.uhn.fhir.rest.api.MethodOutcome; +import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor; +import ca.uhn.fhir.rest.server.IResourceProvider; +import ca.uhn.fhir.rest.server.RestfulServer; +import ca.uhn.fhir.util.PortUtil; +import com.google.common.collect.Lists; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.hl7.fhir.dstu3.model.*; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; +import org.junit.*; + +import javax.servlet.http.HttpServletRequest; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Test the rest-hook subscriptions + */ +@SuppressWarnings("Duplicates") +public class RetriggeringDstu3Test extends BaseResourceProviderDstu3Test { + + private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(RetriggeringDstu3Test.class); + private static List ourCreatedObservations = Lists.newArrayList(); + private static int ourListenerPort; + private static RestfulServer ourListenerRestServer; + private static Server ourListenerServer; + private static String ourListenerServerBase; + private static List ourUpdatedObservations = Lists.newArrayList(); + private static List ourContentTypes = new ArrayList<>(); + private List mySubscriptionIds = new ArrayList<>(); + + @After + public void afterUnregisterRestHookListener() { + ourLog.info("**** Starting @After *****"); + + for (IIdType next : mySubscriptionIds) { + ourClient.delete().resourceById(next).execute(); + } + mySubscriptionIds.clear(); + + myDaoConfig.setAllowMultipleDelete(true); + ourLog.info("Deleting all subscriptions"); + ourClient.delete().resourceConditionalByUrl("Subscription?status=active").execute(); + ourClient.delete().resourceConditionalByUrl("Observation?code:missing=false").execute(); + ourLog.info("Done deleting all subscriptions"); + myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete()); + + ourRestServer.unregisterInterceptor(ourRestHookSubscriptionInterceptor); + + } + + @Before + public void beforeRegisterRestHookListener() { + ourRestServer.registerInterceptor(ourRestHookSubscriptionInterceptor); + } + + @Before + public void beforeReset() { + ourCreatedObservations.clear(); + ourUpdatedObservations.clear(); + ourContentTypes.clear(); + } + + private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException { + Subscription subscription = new Subscription(); + subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)"); + subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED); + subscription.setCriteria(theCriteria); + + Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent(); + channel.setType(Subscription.SubscriptionChannelType.RESTHOOK); + channel.setPayload(thePayload); + channel.setEndpoint(theEndpoint); + subscription.setChannel(channel); + + MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute(); + subscription.setId(methodOutcome.getId().getIdPart()); + mySubscriptionIds.add(methodOutcome.getId()); + + waitForQueueToDrain(); + + return subscription; + } + + private Observation sendObservation(String code, String system) { + Observation observation = new Observation(); + CodeableConcept codeableConcept = new CodeableConcept(); + observation.setCode(codeableConcept); + Coding coding = codeableConcept.addCoding(); + coding.setCode(code); + coding.setSystem(system); + + observation.setStatus(Observation.ObservationStatus.FINAL); + + MethodOutcome methodOutcome = ourClient.create().resource(observation).execute(); + + String observationId = methodOutcome.getId().getIdPart(); + observation.setId(observationId); + + return observation; + } + + @Test + public void testRetriggerResourceToSpecificSubscription() throws Exception { + String payload = "application/fhir+json"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; + + IdType subscriptionId = createSubscription(criteria1, payload, ourListenerServerBase).getIdElement().withResourceType("Subscription"); + createSubscription(criteria2, payload, ourListenerServerBase).getIdElement(); + + IdType obsId = sendObservation(code, "SNOMED-CT").getIdElement().withResourceType("Observation"); + + // Should see 1 subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); + + ourClient.registerInterceptor(new LoggingInterceptor(true)); + + Parameters response = ourClient + .operation() + .onInstance(subscriptionId) + .named(JpaConstants.OPERATION_RETRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionRetriggeringProvider.RESOURCE_ID, new UriType(obsId.toUnqualifiedVersionless().getValue())) + .execute(); + + OperationOutcome oo = (OperationOutcome) response.getParameter().get(0).getResource(); + assertEquals("Triggered resource " + obsId.getValue() + " for subscription", oo.getIssue().get(0).getDiagnostics()); + + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(2, ourUpdatedObservations); + + } + + @Test + public void testRetriggerResourceToSpecificSubscriptionWhichDoesntMatch() throws Exception { + String payload = "application/fhir+json"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; + + createSubscription(criteria1, payload, ourListenerServerBase).getIdElement().withResourceType("Subscription"); + IdType subscriptionId = createSubscription(criteria2, payload, ourListenerServerBase).getIdElement().withResourceType("Subscription"); + + IdType obsId = sendObservation(code, "SNOMED-CT").getIdElement().withResourceType("Observation"); + + // Should see 1 subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); + + ourClient.registerInterceptor(new LoggingInterceptor(true)); + + Parameters response = ourClient + .operation() + .onInstance(subscriptionId) + .named(JpaConstants.OPERATION_RETRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionRetriggeringProvider.RESOURCE_ID, new UriType(obsId.toUnqualifiedVersionless().getValue())) + .execute(); + + OperationOutcome oo = (OperationOutcome) response.getParameter().get(0).getResource(); + assertEquals("Triggered resource " + obsId.getValue() + " for subscription", oo.getIssue().get(0).getDiagnostics()); + + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + + } + + + private void waitForQueueToDrain() throws InterruptedException { + RestHookTestDstu2Test.waitForQueueToDrain(ourRestHookSubscriptionInterceptor); + } + + public static class ObservationListener implements IResourceProvider { + + @Create + public MethodOutcome create(@ResourceParam Observation theObservation, HttpServletRequest theRequest) { + ourLog.info("Received Listener Create"); + ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", "")); + ourCreatedObservations.add(theObservation); + return new MethodOutcome(new IdType("Observation/1"), true); + } + + @Override + public Class getResourceType() { + return Observation.class; + } + + @Update + public MethodOutcome update(@ResourceParam Observation theObservation, HttpServletRequest theRequest) { + ourUpdatedObservations.add(theObservation); + ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", "")); + ourLog.info("Received Listener Update (now have {} updates)", ourUpdatedObservations.size()); + return new MethodOutcome(new IdType("Observation/1"), false); + } + + } + + @BeforeClass + public static void startListenerServer() throws Exception { + ourListenerPort = PortUtil.findFreePort(); + ourListenerRestServer = new RestfulServer(FhirContext.forDstu3()); + ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; + + ObservationListener obsListener = new ObservationListener(); + ourListenerRestServer.setResourceProviders(obsListener); + + ourListenerServer = new Server(ourListenerPort); + + ServletContextHandler proxyHandler = new ServletContextHandler(); + proxyHandler.setContextPath("/"); + + ServletHolder servletHolder = new ServletHolder(); + servletHolder.setServlet(ourListenerRestServer); + proxyHandler.addServlet(servletHolder, "/fhir/context/*"); + + ourListenerServer.setHandler(proxyHandler); + ourListenerServer.start(); + } + + @AfterClass + public static void stopListenerServer() throws Exception { + ourListenerServer.stop(); + } + +} diff --git a/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/client/OperationClientR4Test.java b/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/client/OperationClientR4Test.java index cbc8211120a..94d686d93da 100644 --- a/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/client/OperationClientR4Test.java +++ b/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/client/OperationClientR4Test.java @@ -1,6 +1,7 @@ package ca.uhn.fhir.rest.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -17,6 +18,7 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.*; import org.apache.http.message.BasicHeader; import org.apache.http.message.BasicStatusLine; +import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.Parameters; import org.hl7.fhir.r4.model.StringType; import org.junit.*; @@ -124,6 +126,22 @@ public class OperationClientR4Test { assertEquals("http://foo/$nonrepeating?valstr=str&valtok=sys2%7Cval2", value.getURI().toASCIIString()); } + + @Test + public void testOperationOnInstanceWithIncompleteInstanceId() throws Exception { + try { + ourGenClient + .operation() + .onInstance(new IdType("123")) + .named("nonrepeating") + .withSearchParameter(Parameters.class, "valstr", new StringParam("str")) + .execute(); + fail(); + } catch (IllegalArgumentException e) { + assertEquals("Can not invoke operation \"$nonrepeating\" on instance \"123\" - No resource type specified", e.getMessage()); + } + } + @Test public void testNonRepeatingUsingParameters() throws Exception { Parameters response = ourAnnClient.nonrepeating(new StringParam("str"), new TokenParam("sys", "val")); diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 11819061aaa..07a90d11e5d 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -22,6 +22,20 @@ some database drivers did not automatically register and had to be manually added to the classpath. + + The module which deletes stale searches has been modified so that it deletes very large + searches (searches with 10000+ results in the query cache) in smaller batches, in order + to avoid having very long running delete operations running. + + + When invoking an operation using the fluent client on an instance, the operation would + accidentally invoke against the server if the provided ID did not include a type. This + has been corrected so that an IllegalArgumentException is now thrown. + + + A new operation has been added to the JPA server called $retrigger-subscription. This can + be used to cause a transaction to redeliver a resource that previously triggered. + diff --git a/src/site/xdoc/index.xml b/src/site/xdoc/index.xml index 02941afa2d9..4585434f8dc 100644 --- a/src/site/xdoc/index.xml +++ b/src/site/xdoc/index.xml @@ -75,7 +75,7 @@

This release is happening a little bit later than we had hoped. This release features a complete reworking of the way that search indexes in the JPA server work, as well - as a new database migration tool that can be used to miograte to a new version of + as a new database migration tool that can be used to migrate to a new version of HAPI FHIR. Testing these features ended up taking longer than we had hoped, but we think it will be worth the wait.