Merge remote-tracking branch 'origin/master' into im_20200728_term_multi_version_support

This commit is contained in:
ianmarshall 2020-09-20 22:02:08 -04:00
commit f9f8bf6515
39 changed files with 657 additions and 431 deletions

View File

@ -1620,7 +1620,7 @@ public enum Pointcut {
* <p>
* Hooks may accept the following parameters:
* <ul>
* <li>ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage - This parameter should not be modified as processing is complete when this hook is invoked.</li>
* <li>ca.uhn.fhir.rest.server.messaging.ResourceOperationMessage - This parameter should not be modified as processing is complete when this hook is invoked.</li>
* <li>ca.uhn.fhir.empi.model.TransactionLogMessages - This parameter is for informational messages provided by the EMPI module during EMPI procesing. .</li>
* </ul>
* </p>
@ -1628,7 +1628,7 @@ public enum Pointcut {
* Hooks should return <code>void</code>.
* </p>
*/
EMPI_AFTER_PERSISTED_RESOURCE_CHECKED(void.class, "ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage", "ca.uhn.fhir.rest.server.TransactionLogMessages"),
EMPI_AFTER_PERSISTED_RESOURCE_CHECKED(void.class, "ca.uhn.fhir.rest.server.messaging.ResourceOperationMessage", "ca.uhn.fhir.rest.server.TransactionLogMessages"),
/**
* <b>Performance Tracing Hook:</b>

View File

@ -0,0 +1,6 @@
---
type: add
issue: 2083
title: "The JPA search coordinator will now use worker threads sourced from a ThreadPoolTaskExecutor, in order to simplify
the addition of decorators to those threads. Thanks to Tue Toft Nørgård for the pull requets!"

View File

@ -0,0 +1,5 @@
---
type: add
issue: 2087
title: "Added new DaoConfig parameter called maximumTransactionBundleSize that if not-null will throw a
PayloadTooLarge exception when the number of resources in a transaction bundle exceeds this size."

View File

@ -80,6 +80,7 @@ public class DaoConfig {
* @see #setMaximumSearchResultCountInTransaction(Integer)
*/
private static final Integer DEFAULT_MAXIMUM_SEARCH_RESULT_COUNT_IN_TRANSACTION = null;
private static final Integer DEFAULT_MAXIMUM_TRANSACTION_BUNDLE_SIZE = null;
private static final Logger ourLog = LoggerFactory.getLogger(DaoConfig.class);
private static final int DEFAULT_EXPUNGE_BATCH_SIZE = 800;
private IndexEnabledEnum myIndexMissingFieldsEnabled = IndexEnabledEnum.DISABLED;
@ -126,6 +127,8 @@ public class DaoConfig {
private boolean myIndexContainedResources = true;
private int myMaximumExpansionSize = DEFAULT_MAX_EXPANSION_SIZE;
private Integer myMaximumSearchResultCountInTransaction = DEFAULT_MAXIMUM_SEARCH_RESULT_COUNT_IN_TRANSACTION;
private Integer myMaximumTransactionBundleSize = DEFAULT_MAXIMUM_TRANSACTION_BUNDLE_SIZE;
private ResourceEncodingEnum myResourceEncoding = ResourceEncodingEnum.JSONC;
/**
* update setter javadoc if default changes
@ -655,6 +658,31 @@ public class DaoConfig {
myMaximumSearchResultCountInTransaction = theMaximumSearchResultCountInTransaction;
}
/**
* Specifies the maximum number of resources permitted within a single transaction bundle.
* If a transaction bundle is submitted with more than this number of resources, it will be
* rejected with a PayloadTooLarge exception.
* <p>
* The default value is <code>null</code>, which means that there is no limit.
* </p>
*/
public Integer getMaximumTransactionBundleSize() {
return myMaximumTransactionBundleSize;
}
/**
* Specifies the maximum number of resources permitted within a single transaction bundle.
* If a transaction bundle is submitted with more than this number of resources, it will be
* rejected with a PayloadTooLarge exception.
* <p>
* The default value is <code>null</code>, which means that there is no limit.
* </p>
*/
public DaoConfig setMaximumTransactionBundleSize(Integer theMaximumTransactionBundleSize) {
myMaximumTransactionBundleSize = theMaximumTransactionBundleSize;
return this;
}
/**
* This setting controls the number of threads allocated to resource reindexing
* (which is only ever used if SearchParameters change, or a manual reindex is

View File

@ -262,6 +262,14 @@ public abstract class BaseConfig {
return new DatabaseSearchResultCacheSvcImpl();
}
@Bean
public ThreadPoolTaskExecutor searchCoordinatorThreadFactory() {
final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("search_coord_");
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
@Bean
public TaskScheduler taskScheduler() {
ConcurrentTaskScheduler retVal = new ConcurrentTaskScheduler();

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IJpaDao;
@ -54,6 +55,7 @@ import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.NotModifiedException;
import ca.uhn.fhir.rest.server.exceptions.PayloadTooLargeException;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
import ca.uhn.fhir.rest.server.method.BaseMethodBinding;
import ca.uhn.fhir.rest.server.method.BaseResourceReturningMethodBinding;
@ -124,6 +126,8 @@ public abstract class BaseTransactionProcessor {
private MatchResourceUrlService myMatchResourceUrlService;
@Autowired
private HapiTransactionService myHapiTransactionService;
@Autowired
private DaoConfig myDaoConfig;
@PostConstruct
public void start() {
@ -342,7 +346,15 @@ public abstract class BaseTransactionProcessor {
throw new InvalidRequestException("Unable to process transaction where incoming Bundle.type = " + transactionType);
}
ourLog.debug("Beginning {} with {} resources", theActionName, myVersionAdapter.getEntries(theRequest).size());
int numberOfEntries = myVersionAdapter.getEntries(theRequest).size();
if (myDaoConfig.getMaximumTransactionBundleSize() != null && numberOfEntries > myDaoConfig.getMaximumTransactionBundleSize()) {
throw new PayloadTooLargeException("Transaction Bundle Too large. Transaction bundle contains " +
numberOfEntries +
" which exceedes the maximum permitted transaction bundle size of " + myDaoConfig.getMaximumTransactionBundleSize());
}
ourLog.debug("Beginning {} with {} resources", theActionName, numberOfEntries);
final TransactionDetails transactionDetails = new TransactionDetails();
final StopWatch transactionStopWatch = new StopWatch();
@ -350,7 +362,7 @@ public abstract class BaseTransactionProcessor {
List<IBase> requestEntries = myVersionAdapter.getEntries(theRequest);
// Do all entries have a verb?
for (int i = 0; i < myVersionAdapter.getEntries(theRequest).size(); i++) {
for (int i = 0; i < numberOfEntries; i++) {
IBase nextReqEntry = requestEntries.get(i);
String verb = myVersionAdapter.getEntryRequestVerb(myContext, nextReqEntry);
if (verb == null || !isValidVerb(verb)) {

View File

@ -37,7 +37,6 @@ import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchInclude;
import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
@ -54,6 +53,7 @@ import ca.uhn.fhir.rest.api.SummaryEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.IPagingProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
@ -80,7 +80,7 @@ import org.springframework.data.domain.Sort;
import org.springframework.orm.jpa.JpaDialect;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.vendor.HibernateJpaDialect;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
@ -111,7 +111,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
@ -161,9 +160,9 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
/**
* Constructor
*/
public SearchCoordinatorSvcImpl() {
CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("search_coord_");
myExecutor = Executors.newCachedThreadPool(threadFactory);
@Autowired
public SearchCoordinatorSvcImpl(ThreadPoolTaskExecutor searchCoordinatorThreadFactory) {
myExecutor = searchCoordinatorThreadFactory.getThreadPoolExecutor();
}
@VisibleForTesting

View File

@ -1,46 +0,0 @@
package ca.uhn.fhir.jpa.dao.dstu3;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import ca.uhn.fhir.util.TestUtil;
public class FhirSystemDaoDstu3SearchTest extends BaseJpaDstu3SystemTest {
@Test
public void testSearchByParans() {
// code to come.. just here to prevent a failure
}
/*//@formatter:off
* [ERROR] Search parameter action has conflicting types token and reference
* [ERROR] Search parameter source has conflicting types token and reference
* [ERROR] Search parameter plan has conflicting types reference and token
* [ERROR] Search parameter version has conflicting types token and string
* [ERROR] Search parameter source has conflicting types reference and uri
* [ERROR] Search parameter location has conflicting types reference and uri
* [ERROR] Search parameter title has conflicting types string and token
* [ERROR] Search parameter manufacturer has conflicting types string and reference
* [ERROR] Search parameter address has conflicting types token and string
* [ERROR] Search parameter source has conflicting types reference and string
* [ERROR] Search parameter destination has conflicting types reference and string
* [ERROR] Search parameter responsible has conflicting types reference and string
* [ERROR] Search parameter value has conflicting types token and string
* [ERROR] Search parameter address has conflicting types token and string
* [ERROR] Search parameter address has conflicting types token and string
* [ERROR] Search parameter address has conflicting types token and string
* [ERROR] Search parameter address has conflicting types token and string
* [ERROR] Search parameter action has conflicting types reference and token
* [ERROR] Search parameter version has conflicting types token and string
* [ERROR] Search parameter address has conflicting types token and string
* [ERROR] Search parameter base has conflicting types reference and token
* [ERROR] Search parameter target has conflicting types reference and token
* [ERROR] Search parameter base has conflicting types reference and uri
* [ERROR] Search parameter contact has conflicting types string and token
* [ERROR] Search parameter substance has conflicting types token and reference
* [ERROR] Search parameter provider has conflicting types reference and token
* [ERROR] Search parameter system has conflicting types token and uri
* [ERROR] Search parameter reference has conflicting types reference and uri
* //@formatter:off
*/
}

View File

@ -2888,108 +2888,6 @@ public class FhirSystemDaoDstu3Test extends BaseJpaDstu3SystemTest {
assertEquals(1, found.size().intValue());
}
//
//
// /**
// * Issue #55
// */
// @Test
// public void testTransactionWithCidIds() throws Exception {
// Bundle request = new Bundle();
//
// Patient p1 = new Patient();
// p1.setId("cid:patient1");
// p1.addIdentifier().setSystem("system").setValue("testTransactionWithCidIds01");
// res.add(p1);
//
// Observation o1 = new Observation();
// o1.setId("cid:observation1");
// o1.getIdentifier().setSystem("system").setValue("testTransactionWithCidIds02");
// o1.setSubject(new Reference("Patient/cid:patient1"));
// res.add(o1);
//
// Observation o2 = new Observation();
// o2.setId("cid:observation2");
// o2.getIdentifier().setSystem("system").setValue("testTransactionWithCidIds03");
// o2.setSubject(new Reference("Patient/cid:patient1"));
// res.add(o2);
//
// ourSystemDao.transaction(res);
//
// assertTrue(p1.getId().getValue(), p1.getId().getIdPart().matches("^[0-9]+$"));
// assertTrue(o1.getId().getValue(), o1.getId().getIdPart().matches("^[0-9]+$"));
// assertTrue(o2.getId().getValue(), o2.getId().getIdPart().matches("^[0-9]+$"));
//
// assertThat(o1.getSubject().getReference().getValue(), endsWith("Patient/" + p1.getId().getIdPart()));
// assertThat(o2.getSubject().getReference().getValue(), endsWith("Patient/" + p1.getId().getIdPart()));
//
// }
//
// @Test
// public void testTransactionWithDelete() throws Exception {
// Bundle request = new Bundle();
//
// /*
// * Create 3
// */
//
// List<IResource> res;
// res = new ArrayList<IResource>();
//
// Patient p1 = new Patient();
// p1.addIdentifier().setSystem("urn:system").setValue("testTransactionWithDelete");
// res.add(p1);
//
// Patient p2 = new Patient();
// p2.addIdentifier().setSystem("urn:system").setValue("testTransactionWithDelete");
// res.add(p2);
//
// Patient p3 = new Patient();
// p3.addIdentifier().setSystem("urn:system").setValue("testTransactionWithDelete");
// res.add(p3);
//
// ourSystemDao.transaction(res);
//
// /*
// * Verify
// */
//
// IBundleProvider results = ourPatientDao.search(Patient.SP_IDENTIFIER, new TokenParam("urn:system",
// "testTransactionWithDelete"));
// assertEquals(3, results.size());
//
// /*
// * Now delete 2
// */
//
// request = new Bundle();
// res = new ArrayList<IResource>();
// List<IResource> existing = results.getResources(0, 3);
//
// p1 = new Patient();
// p1.setId(existing.get(0).getId());
// ResourceMetadataKeyEnum.DELETED_AT.put(p1, InstantDt.withCurrentTime());
// res.add(p1);
//
// p2 = new Patient();
// p2.setId(existing.get(1).getId());
// ResourceMetadataKeyEnum.DELETED_AT.put(p2, InstantDt.withCurrentTime());
// res.add(p2);
//
// ourSystemDao.transaction(res);
//
// /*
// * Verify
// */
//
// IBundleProvider results2 = ourPatientDao.search(Patient.SP_IDENTIFIER, new TokenParam("urn:system",
// "testTransactionWithDelete"));
// assertEquals(1, results2.size());
// List<IResource> existing2 = results2.getResources(0, 1);
// assertEquals(existing2.get(0).getId(), existing.get(2).getId());
//
// }
@Test
public void testTransactionWithRelativeOidIds() {
Bundle res = new Bundle();

View File

@ -0,0 +1,78 @@
package ca.uhn.fhir.jpa.dao.dstu3;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.rest.server.exceptions.PayloadTooLargeException;
import org.hl7.fhir.dstu3.model.Bundle;
import org.hl7.fhir.dstu3.model.Bundle.BundleType;
import org.hl7.fhir.dstu3.model.Bundle.HTTPVerb;
import org.hl7.fhir.dstu3.model.Observation;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class FhirSystemDaoTransactionDstu3Test extends BaseJpaDstu3SystemTest {
public static final int TEST_MAXIMUM_TRANSACTION_BUNDLE_SIZE = 5;
@AfterEach
public void after() {
myDaoConfig.setMaximumTransactionBundleSize(new DaoConfig().getMaximumTransactionBundleSize());
}
@BeforeEach
public void beforeDisableResultReuse() {
myDaoConfig.setMaximumTransactionBundleSize(TEST_MAXIMUM_TRANSACTION_BUNDLE_SIZE);
}
private Bundle createInputTransactionWithSize(int theSize) {
Bundle retval = new Bundle();
retval.setType(BundleType.TRANSACTION);
for (int i = 0; i < theSize; ++i) {
Observation obs = new Observation();
obs.setStatus(Observation.ObservationStatus.FINAL);
retval
.addEntry()
.setFullUrl("urn:uuid:000" + i)
.setResource(obs)
.getRequest()
.setMethod(HTTPVerb.POST);
}
return retval;
}
@Test
public void testTransactionTooBig() {
Bundle bundle = createInputTransactionWithSize(TEST_MAXIMUM_TRANSACTION_BUNDLE_SIZE + 1);
try {
mySystemDao.transaction(null, bundle);
fail();
} catch (PayloadTooLargeException e) {
assertThat(e.getMessage(), containsString("Transaction Bundle Too large. Transaction bundle contains " +
(TEST_MAXIMUM_TRANSACTION_BUNDLE_SIZE + 1) +
" which exceedes the maximum permitted transaction bundle size of " + TEST_MAXIMUM_TRANSACTION_BUNDLE_SIZE));
}
}
@Test
public void testTransactionSmallEnough() {
testTransactionBundleSucceedsWithSize(TEST_MAXIMUM_TRANSACTION_BUNDLE_SIZE);
testTransactionBundleSucceedsWithSize(TEST_MAXIMUM_TRANSACTION_BUNDLE_SIZE - 1);
testTransactionBundleSucceedsWithSize(1);
}
private void testTransactionBundleSucceedsWithSize(int theSize) {
Bundle bundle = createInputTransactionWithSize(theSize);
Bundle response = mySystemDao.transaction(null, bundle);
assertEquals(theSize, response.getEntry().size());
assertEquals("201 Created", response.getEntry().get(0).getResponse().getStatus());
assertEquals("201 Created", response.getEntry().get(theSize - 1).getResponse().getStatus());
}
}

View File

@ -384,6 +384,7 @@ public class NpmTestR4 extends BaseJpaR4Test {
PackageInstallationSpec spec = new PackageInstallationSpec().setName("hl7.fhir.uv.shorthand").setVersion("0.12.0").setInstallMode(PackageInstallationSpec.InstallModeEnum.STORE_ONLY);
igInstaller.install(spec);
runInTransaction(() -> {
NpmPackageVersionEntity versionEntity = myPackageVersionDao.findByPackageIdAndVersion("hl7.fhir.uv.shorthand", "0.12.0").orElseThrow(() -> new IllegalArgumentException());
assertEquals(true, versionEntity.isCurrentVersion());

View File

@ -7,6 +7,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.config.dstu3.BaseDstu3Config;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilder;
@ -124,7 +125,7 @@ public class SearchCoordinatorSvcImplTest {
myCurrentSearch = null;
mySvc = new SearchCoordinatorSvcImpl();
mySvc = new SearchCoordinatorSvcImpl(new BaseDstu3Config().searchCoordinatorThreadFactory());
mySvc.setEntityManagerForUnitTest(myEntityManager);
mySvc.setTransactionManagerForUnitTest(myTxManager);
mySvc.setContextForUnitTest(ourCtx);

View File

@ -33,6 +33,7 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.server.TransactionLogMessages;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.messaging.ResourceOperationMessage;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
@ -93,16 +94,20 @@ public class EmpiMessageHandler implements MessageHandler {
}catch (Exception e) {
log(empiContext, "Failure during EMPI processing: " + e.getMessage());
} finally {
// Interceptor call: EMPI_AFTER_PERSISTED_RESOURCE_CHECKED
ResourceOperationMessage outgoingMsg = new ResourceOperationMessage(myFhirContext, theMsg.getPayload(myFhirContext), theMsg.getOperationType());
outgoingMsg.setTransactionId(theMsg.getTransactionId());
HookParams params = new HookParams()
.add(ResourceModifiedMessage.class, theMsg)
.add(ResourceOperationMessage.class, outgoingMsg)
.add(TransactionLogMessages.class, empiContext.getTransactionLogMessages());
myInterceptorBroadcaster.callHooks(Pointcut.EMPI_AFTER_PERSISTED_RESOURCE_CHECKED, params);
}
}
private EmpiTransactionContext createEmpiContext(ResourceModifiedMessage theMsg) {
TransactionLogMessages transactionLogMessages = TransactionLogMessages.createFromTransactionGuid(theMsg.getParentTransactionGuid());
TransactionLogMessages transactionLogMessages = TransactionLogMessages.createFromTransactionGuid(theMsg.getTransactionId());
EmpiTransactionContext.OperationType empiOperation;
switch (theMsg.getOperationType()) {
case CREATE:

View File

@ -64,7 +64,7 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, IBaseResource thePayloadResource) {
ResourceModifiedMessage payload = new ResourceModifiedMessage(myFhirContext, thePayloadResource, theMsg.getOperationType());
payload.setParentTransactionGuid(theMsg.getParentTransactionGuid());
payload.setTransactionId(theMsg.getTransactionId());
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(payload);
theChannelProducer.send(message);
ourLog.debug("Delivering {} message payload {} for {}", theMsg.getOperationType(), theMsg.getPayloadId(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());

View File

@ -37,11 +37,14 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
private Logger ourLog = LoggerFactory.getLogger(DaoSubscriptionMatcher.class);
@Autowired
DaoRegistry myDaoRegistry;
@Autowired
MatchUrlService myMatchUrlService;
private Logger ourLog = LoggerFactory.getLogger(DaoSubscriptionMatcher.class);
@Autowired
private FhirContext myCtx;

View File

@ -23,13 +23,13 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.SubscriptionUtil;
import org.hl7.fhir.instance.model.api.IBaseResource;

View File

@ -86,7 +86,6 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
matchActiveSubscriptionsAndDeliver(msg);
}
public void matchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
@ -164,7 +163,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
deliveryMsg.setPayload(myFhirContext, payload, encoding);
deliveryMsg.setSubscription(subscription);
deliveryMsg.setOperationType(theMsg.getOperationType());
deliveryMsg.setParentTransactionGuid(theMsg.getParentTransactionGuid());
deliveryMsg.setTransactionId(theMsg.getTransactionId());
deliveryMsg.copyAdditionalPropertiesFrom(theMsg);
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED

View File

@ -21,10 +21,10 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -1,49 +0,0 @@
package ca.uhn.fhir.jpa.subscription.model;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
public abstract class BaseJsonMessage<T> implements Message<T>, IModelJson {
private static final long serialVersionUID = 1L;
@JsonProperty("headers")
private MessageHeaders myHeaders;
/**
* Constructor
*/
public BaseJsonMessage() {
super();
}
@Override
public MessageHeaders getHeaders() {
return myHeaders;
}
public void setHeaders(MessageHeaders theHeaders) {
myHeaders = theHeaders;
}
}

View File

@ -1,25 +0,0 @@
package ca.uhn.fhir.jpa.subscription.model;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
public interface IResourceMessage {
String getPayloadId();
}

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.subscription.model;
* #L%
*/
import ca.uhn.fhir.rest.server.messaging.json.BaseJsonMessage;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder;

View File

@ -23,6 +23,8 @@ package ca.uhn.fhir.jpa.subscription.model;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import ca.uhn.fhir.rest.server.messaging.IResourceMessage;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder;
@ -38,22 +40,10 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
private CanonicalSubscription mySubscription;
@JsonProperty("payload")
private String myPayloadString;
@JsonIgnore
private transient IBaseResource myPayload;
@JsonProperty("payloadId")
private String myPayloadId;
@JsonProperty("parentTransactionGuid")
private String myParentTransactionGuid;
@JsonProperty("operationType")
private ResourceModifiedMessage.OperationTypeEnum myOperationType;
public String getParentTransactionGuid() {
return myParentTransactionGuid;
}
public void setParentTransactionGuid(String theParentTransactionGuid) {
myParentTransactionGuid = theParentTransactionGuid;
}
@JsonIgnore
private transient IBaseResource myPayloadDecoded;
/**
* Constructor
@ -62,20 +52,12 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
super();
}
public ResourceModifiedMessage.OperationTypeEnum getOperationType() {
return myOperationType;
}
public void setOperationType(ResourceModifiedMessage.OperationTypeEnum theOperationType) {
myOperationType = theOperationType;
}
public IBaseResource getPayload(FhirContext theCtx) {
IBaseResource retVal = myPayload;
IBaseResource retVal = myPayloadDecoded;
if (retVal == null && isNotBlank(myPayloadString)) {
IParser parser = EncodingEnum.detectEncoding(myPayloadString).newParser(theCtx);
retVal = parser.parseResource(myPayloadString);
myPayload = retVal;
myPayloadDecoded = retVal;
}
return retVal;
}
@ -133,9 +115,9 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
return new ToStringBuilder(this)
.append("mySubscription", mySubscription)
.append("myPayloadString", myPayloadString)
.append("myPayload", myPayload)
.append("myPayload", myPayloadDecoded)
.append("myPayloadId", myPayloadId)
.append("myOperationType", myOperationType)
.append("myOperationType", getOperationType())
.toString();
}

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.subscription.model;
* #L%
*/
import ca.uhn.fhir.rest.server.messaging.json.BaseJsonMessage;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder;

View File

@ -21,40 +21,24 @@ package ca.uhn.fhir.jpa.subscription.model;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.util.ResourceReferenceInfo;
import com.fasterxml.jackson.annotation.JsonIgnore;
import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import java.util.List;
/**
* Most of this class has been moved to ResourceModifiedMessage in the hapi-fhir-server project, for a reusable channel ResourceModifiedMessage
* that doesn't require knowledge of subscriptions.
*/
public class ResourceModifiedMessage extends BaseResourceModifiedMessage {
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class ResourceModifiedMessage extends BaseResourceMessage implements IResourceMessage, IModelJson {
@JsonProperty("resourceId")
private String myId;
@JsonProperty("operationType")
private OperationTypeEnum myOperationType;
/**
* This will only be set if the resource is being triggered for a specific
* subscription
*/
@JsonProperty(value = "subscriptionId", required = false)
private String mySubscriptionId;
@JsonProperty("payload")
private String myPayload;
@JsonProperty("payloadId")
private String myPayloadId;
@JsonProperty("parentTransactionGuid")
private String myParentTransactionGuid;
@JsonIgnore
private transient IBaseResource myPayloadDecoded;
/**
* Constructor
@ -64,25 +48,13 @@ public class ResourceModifiedMessage extends BaseResourceMessage implements IRes
}
public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
this();
setId(theResource.getIdElement());
setOperationType(theOperationType);
if (theOperationType != OperationTypeEnum.DELETE) {
setNewPayload(theFhirContext, theResource);
}
super(theFhirContext, theResource, theOperationType);
}
public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) {
this(theFhirContext, theNewResource, theOperationType);
if (theRequest != null) {
setParentTransactionGuid(theRequest.getTransactionGuid());
}
super(theFhirContext, theNewResource, theOperationType, theRequest);
}
@Override
public String getPayloadId() {
return myPayloadId;
}
public String getSubscriptionId() {
return mySubscriptionId;
@ -92,98 +64,6 @@ public class ResourceModifiedMessage extends BaseResourceMessage implements IRes
mySubscriptionId = theSubscriptionId;
}
public String getId() {
return myId;
}
public IIdType getId(FhirContext theCtx) {
IIdType retVal = null;
if (myId != null) {
retVal = theCtx.getVersion().newIdType().setValue(myId);
}
return retVal;
}
public IBaseResource getNewPayload(FhirContext theCtx) {
if (myPayloadDecoded == null && isNotBlank(myPayload)) {
myPayloadDecoded = theCtx.newJsonParser().parseResource(myPayload);
}
return myPayloadDecoded;
}
public OperationTypeEnum getOperationType() {
return myOperationType;
}
public void setOperationType(OperationTypeEnum theOperationType) {
myOperationType = theOperationType;
}
public void setId(IIdType theId) {
myId = null;
if (theId != null) {
myId = theId.getValue();
}
}
public String getParentTransactionGuid() {
return myParentTransactionGuid;
}
public void setParentTransactionGuid(String theParentTransactionGuid) {
myParentTransactionGuid = theParentTransactionGuid;
}
private void setNewPayload(FhirContext theCtx, IBaseResource theNewPayload) {
/*
* References with placeholders would be invalid by the time we get here, and
* would be caught before we even get here. This check is basically a last-ditch
* effort to make sure nothing has broken in the various safeguards that
* should prevent this from happening (hence it only being an assert as
* opposed to something executed all the time).
*/
assert payloadContainsNoPlaceholderReferences(theCtx, theNewPayload);
/*
* Note: Don't set myPayloadDecoded in here- This is a false optimization since
* it doesn't actually get used if anyone is doing subscriptions at any
* scale using a queue engine, and not going through the serialize/deserialize
* as we would in a queue engine can mask bugs.
* -JA
*/
myPayload = theCtx.newJsonParser().encodeResourceToString(theNewPayload);
myPayloadId = theNewPayload.getIdElement().toUnqualified().getValue();
}
public enum OperationTypeEnum {
CREATE,
UPDATE,
DELETE,
MANUALLY_TRIGGERED
}
private static boolean payloadContainsNoPlaceholderReferences(FhirContext theCtx, IBaseResource theNewPayload) {
List<ResourceReferenceInfo> refs = theCtx.newTerser().getAllResourceReferences(theNewPayload);
for (ResourceReferenceInfo next : refs) {
String ref = next.getResourceReference().getReferenceElement().getValue();
if (isBlank(ref)) {
IBaseResource resource = next.getResourceReference().getResource();
if (resource != null) {
ref = resource.getIdElement().getValue();
}
}
if (isNotBlank(ref)) {
if (ref.startsWith("#")) {
continue;
}
if (ref.startsWith("urn:uuid:")) {
throw new AssertionError("Reference at " + next.getName() + " is invalid: " + ref);
}
}
}
return true;
}
@Override
public String toString() {
return new ToStringBuilder(this)

View File

@ -26,7 +26,6 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
@ -38,6 +37,7 @@ import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;

View File

@ -25,8 +25,8 @@ import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -3,12 +3,12 @@ package ca.uhn.fhir.jpa.subscription.match.deliver;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;
@ -18,18 +18,20 @@ import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class BaseSubscriptionDeliverySubscriberTest {

View File

@ -33,7 +33,6 @@ public class SubscriptionRegistrySharedTest extends BaseSubscriptionRegistryTest
return "shared";
}
}
}
@Test

View File

@ -10,7 +10,26 @@ import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.util.UrlUtil;
import org.hl7.fhir.dstu3.model.*;
import org.hl7.fhir.dstu3.model.BodySite;
import org.hl7.fhir.dstu3.model.CodeableConcept;
import org.hl7.fhir.dstu3.model.Coding;
import org.hl7.fhir.dstu3.model.CommunicationRequest;
import org.hl7.fhir.dstu3.model.DateTimeType;
import org.hl7.fhir.dstu3.model.Dosage;
import org.hl7.fhir.dstu3.model.Enumerations;
import org.hl7.fhir.dstu3.model.EpisodeOfCare;
import org.hl7.fhir.dstu3.model.IdType;
import org.hl7.fhir.dstu3.model.Location;
import org.hl7.fhir.dstu3.model.MedicationRequest;
import org.hl7.fhir.dstu3.model.Observation;
import org.hl7.fhir.dstu3.model.Patient;
import org.hl7.fhir.dstu3.model.Procedure;
import org.hl7.fhir.dstu3.model.ProcedureRequest;
import org.hl7.fhir.dstu3.model.Provenance;
import org.hl7.fhir.dstu3.model.QuestionnaireResponse;
import org.hl7.fhir.dstu3.model.Reference;
import org.hl7.fhir.dstu3.model.SearchParameter;
import org.hl7.fhir.dstu3.model.Timing;
import org.hl7.fhir.dstu3.model.codesystems.MedicationRequestCategory;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
@ -89,10 +108,8 @@ public class InMemorySubscriptionMatcherR3Test extends BaseSubscriptionDstu3Test
pr.setSubject(new Reference("Patient/"));
assertMatched(pr, "ProcedureRequest?intent=original-order");
assertNotMatched(pr, "ProcedureRequest?subject=Patient/123");
}
@Test
public void testResourceById() {

View File

@ -73,6 +73,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</dependency>
</dependencies>

View File

@ -1,29 +1,12 @@
package ca.uhn.fhir.jpa.subscription.model;
package ca.uhn.fhir.rest.server.messaging;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.Validate;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ -31,9 +14,15 @@ import java.util.Optional;
@SuppressWarnings("WeakerAccess")
public abstract class BaseResourceMessage implements IResourceMessage, IModelJson {
@JsonProperty("operationType")
protected BaseResourceModifiedMessage.OperationTypeEnum myOperationType;
@JsonProperty("attributes")
private Map<String, String> myAttributes;
@JsonProperty("transactionId")
private String myTransactionId;
/**
* Returns an attribute stored in this message.
* <p>
@ -94,4 +83,52 @@ public abstract class BaseResourceMessage implements IResourceMessage, IModelJso
myAttributes.putAll(theMsg.myAttributes);
}
}
/**
* Returns the {@link OperationTypeEnum} that is occurring to the Resource of the message
*
* @return the operation type.
*/
public BaseResourceModifiedMessage.OperationTypeEnum getOperationType() {
return myOperationType;
}
/**
* Sets the {@link OperationTypeEnum} occuring to the resource of the message.
*
* @param theOperationType The operation type to set.
*/
public void setOperationType(BaseResourceModifiedMessage.OperationTypeEnum theOperationType) {
myOperationType = theOperationType;
}
/**
* Retrieve the transaction ID related to this message.
*
* @return the transaction ID, or null.
*/
@Nullable
public String getTransactionId() {
return myTransactionId;
}
/**
* Adds a transaction ID to this message. This ID can be used for many purposes. For example, performing tracing
* across asynchronous hooks, tying data together, or downstream logging purposes.
*
* One current internal implementation uses this field to tie back EMPI processing results (which are asynchronous)
* to the original transaction log that caused the EMPI processing to occur.
*
* @param theTransactionId An ID representing a transaction of relevance to this message.
*/
public void setTransactionId(String theTransactionId) {
myTransactionId = theTransactionId;
}
public enum OperationTypeEnum {
CREATE,
UPDATE,
DELETE,
MANUALLY_TRIGGERED
}
}

View File

@ -0,0 +1,157 @@
package ca.uhn.fhir.rest.server.messaging;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.util.ResourceReferenceInfo;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import java.util.List;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public abstract class BaseResourceModifiedMessage extends BaseResourceMessage implements IResourceMessage, IModelJson {
@JsonProperty("resourceId")
protected String myId;
@JsonProperty("payload")
protected String myPayload;
@JsonProperty("payloadId")
protected String myPayloadId;
@JsonIgnore
protected transient IBaseResource myPayloadDecoded;
/**
* Constructor
*/
public BaseResourceModifiedMessage() {
super();
}
public BaseResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
this();
setId(theResource.getIdElement());
setOperationType(theOperationType);
if (theOperationType != OperationTypeEnum.DELETE) {
setNewPayload(theFhirContext, theResource);
}
}
public BaseResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) {
this(theFhirContext, theNewResource, theOperationType);
if (theRequest != null) {
setTransactionId(theRequest.getTransactionGuid());
}
}
@Override
public String getPayloadId() {
return myPayloadId;
}
public String getId() {
return myId;
}
public IIdType getId(FhirContext theCtx) {
IIdType retVal = null;
if (myId != null) {
retVal = theCtx.getVersion().newIdType().setValue(myId);
}
return retVal;
}
public IBaseResource getNewPayload(FhirContext theCtx) {
if (myPayloadDecoded == null && isNotBlank(myPayload)) {
myPayloadDecoded = theCtx.newJsonParser().parseResource(myPayload);
}
return myPayloadDecoded;
}
public IBaseResource getPayload(FhirContext theCtx) {
IBaseResource retVal = myPayloadDecoded;
if (retVal == null && isNotBlank(myPayload)) {
IParser parser = EncodingEnum.detectEncoding(myPayload).newParser(theCtx);
retVal = parser.parseResource(myPayload);
myPayloadDecoded = retVal;
}
return retVal;
}
public String getPayloadString() {
if (this.myPayload != null) {
return this.myPayload;
}
return "";
}
public void setId(IIdType theId) {
myId = null;
if (theId != null) {
myId = theId.getValue();
}
}
protected void setNewPayload(FhirContext theCtx, IBaseResource theNewPayload) {
/*
* References with placeholders would be invalid by the time we get here, and
* would be caught before we even get here. This check is basically a last-ditch
* effort to make sure nothing has broken in the various safeguards that
* should prevent this from happening (hence it only being an assert as
* opposed to something executed all the time).
*/
assert payloadContainsNoPlaceholderReferences(theCtx, theNewPayload);
/*
* Note: Don't set myPayloadDecoded in here- This is a false optimization since
* it doesn't actually get used if anyone is doing subscriptions at any
* scale using a queue engine, and not going through the serialize/deserialize
* as we would in a queue engine can mask bugs.
* -JA
*/
myPayload = theCtx.newJsonParser().encodeResourceToString(theNewPayload);
myPayloadId = theNewPayload.getIdElement().toUnqualified().getValue();
}
protected static boolean payloadContainsNoPlaceholderReferences(FhirContext theCtx, IBaseResource theNewPayload) {
List<ResourceReferenceInfo> refs = theCtx.newTerser().getAllResourceReferences(theNewPayload);
for (ResourceReferenceInfo next : refs) {
String ref = next.getResourceReference().getReferenceElement().getValue();
if (isBlank(ref)) {
IBaseResource resource = next.getResourceReference().getResource();
if (resource != null) {
ref = resource.getIdElement().getValue();
}
}
if (isNotBlank(ref)) {
if (ref.startsWith("#")) {
continue;
}
if (ref.startsWith("urn:uuid:")) {
throw new AssertionError("Reference at " + next.getName() + " is invalid: " + ref);
}
}
}
return true;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("myId", myId)
.append("myOperationType", myOperationType)
// .append("myPayload", myPayload)
.append("myPayloadId", myPayloadId)
// .append("myPayloadDecoded", myPayloadDecoded)
.toString();
}
}

View File

@ -0,0 +1,7 @@
package ca.uhn.fhir.rest.server.messaging;
public interface IResourceMessage {
String getPayloadId();
}

View File

@ -0,0 +1,18 @@
package ca.uhn.fhir.rest.server.messaging;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import org.hl7.fhir.instance.model.api.IBaseResource;
public class ResourceOperationMessage extends BaseResourceModifiedMessage {
public ResourceOperationMessage() {
}
public ResourceOperationMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
super(theFhirContext, theResource, theOperationType);
}
public ResourceOperationMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) {
super(theFhirContext, theNewResource, theOperationType, theRequest);
}
}

View File

@ -0,0 +1,41 @@
package ca.uhn.fhir.rest.server.messaging.json;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
public abstract class BaseJsonMessage<T> implements Message<T>, IModelJson {
private static final long serialVersionUID = 1L;
@JsonProperty("headers")
private HapiMessageHeaders myHeaders;
/**
* Constructor
*/
public BaseJsonMessage() {
super();
setDefaultRetryHeaders();
}
protected void setDefaultRetryHeaders() {
HapiMessageHeaders messageHeaders = new HapiMessageHeaders();
setHeaders(messageHeaders);
}
@Override
public MessageHeaders getHeaders() {
return myHeaders.toMessageHeaders();
}
public HapiMessageHeaders getHapiHeaders() {
return myHeaders;
}
public void setHeaders(HapiMessageHeaders theHeaders) {
myHeaders = theHeaders;
}
}

View File

@ -0,0 +1,75 @@
package ca.uhn.fhir.rest.server.messaging.json;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.springframework.messaging.MessageHeaders;
import java.util.HashMap;
import java.util.Map;
/**
* This class is for holding headers for BaseJsonMessages. Any serializable data can be thrown into
* the header map. There are also three special headers, defined by the constants in this class, which are for use
* in message handling retrying. There are also matching helper functions for fetching those special variables; however
* they can also be accessed in standard map fashion with a `get` on the map.
*/
public class HapiMessageHeaders implements IModelJson {
public static final String RETRY_COUNT_KEY = "retryCount";
public static final String FIRST_FAILURE_KEY = "firstFailureTimestamp";
public static final String LAST_FAILURE_KEY = "lastFailureTimestamp";
@JsonProperty(RETRY_COUNT_KEY)
private Integer myRetryCount = 0;
@JsonProperty(FIRST_FAILURE_KEY)
private Long myFirstFailureTimestamp;
@JsonProperty(LAST_FAILURE_KEY)
private Long myLastFailureTimestamp;
@JsonProperty("customHeaders")
private final Map<String, Object> headers;
public HapiMessageHeaders(Map<String, Object> theHeaders) {
headers = theHeaders;
}
public HapiMessageHeaders() {
headers = new HashMap<>();
}
public Integer getRetryCount() {
return this.myRetryCount;
}
public Long getFirstFailureDate() {
return this.myFirstFailureTimestamp;
}
public Long getLastFailureDate() {
return this.myLastFailureTimestamp;
}
public void setRetryCount(Integer theRetryCount) {
this.myRetryCount = theRetryCount;
}
public void setLastFailureDate(Long theLastFailureDate) {
this.myLastFailureTimestamp = theLastFailureDate;
}
public void setFirstFailureDate(Long theFirstFailureDate) {
this.myFirstFailureTimestamp = theFirstFailureDate;
}
public Map<String, Object> getCustomHeaders() {
return this.headers;
}
public MessageHeaders toMessageHeaders() {
Map<String, Object> returnedHeaders = new HashMap<>(this.headers);
returnedHeaders.put(RETRY_COUNT_KEY, myRetryCount);
returnedHeaders.put(FIRST_FAILURE_KEY, myFirstFailureTimestamp);
returnedHeaders.put(LAST_FAILURE_KEY, myLastFailureTimestamp);
return new MessageHeaders(returnedHeaders);
}
}

View File

@ -0,0 +1,49 @@
package ca.uhn.fhir.rest.server.messaging.json;
import ca.uhn.fhir.rest.server.messaging.ResourceOperationMessage;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder;
public class ResourceOperationJsonMessage extends BaseJsonMessage<ResourceOperationMessage> {
@JsonProperty("payload")
private ResourceOperationMessage myPayload;
/**
* Constructor
*/
public ResourceOperationJsonMessage() {
super();
}
/**
* Constructor
*/
public ResourceOperationJsonMessage(ResourceOperationMessage thePayload) {
myPayload = thePayload;
setDefaultRetryHeaders();
}
public ResourceOperationJsonMessage(HapiMessageHeaders theRetryMessageHeaders, ResourceOperationMessage thePayload) {
myPayload = thePayload;
setHeaders(theRetryMessageHeaders);
}
@Override
public ResourceOperationMessage getPayload() {
return myPayload;
}
public void setPayload(ResourceOperationMessage thePayload) {
myPayload = thePayload;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("myPayload", myPayload)
.toString();
}
}

View File

@ -0,0 +1,33 @@
package ca.uhn.fhir.rest.server.messaging;
import ca.uhn.fhir.rest.server.messaging.json.ResourceOperationJsonMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import static ca.uhn.fhir.rest.server.messaging.json.HapiMessageHeaders.FIRST_FAILURE_KEY;
import static ca.uhn.fhir.rest.server.messaging.json.HapiMessageHeaders.LAST_FAILURE_KEY;
import static ca.uhn.fhir.rest.server.messaging.json.HapiMessageHeaders.RETRY_COUNT_KEY;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
class ResourceOperationMessageTest {
@Test
public void testSerializationAndDeserializationOfResourceModifiedMessage() throws JsonProcessingException {
ResourceOperationJsonMessage jsonMessage = new ResourceOperationJsonMessage();
jsonMessage.setPayload(new ResourceOperationMessage());
ObjectMapper mapper = new ObjectMapper();
String serialized = mapper.writeValueAsString(jsonMessage);
jsonMessage = mapper.readValue(serialized, ResourceOperationJsonMessage.class);
assertThat(jsonMessage.getHapiHeaders().getRetryCount(), is(equalTo(0)));
assertThat(jsonMessage.getHapiHeaders().getFirstFailureDate(), is(equalTo(null)));
assertThat(jsonMessage.getHapiHeaders().getLastFailureDate(), is(equalTo(null)));
assertThat(jsonMessage.getHeaders().get(RETRY_COUNT_KEY), is(equalTo(0)));
assertThat(jsonMessage.getHeaders().get(FIRST_FAILURE_KEY), is(equalTo(null)));
assertThat(jsonMessage.getHeaders().get(LAST_FAILURE_KEY), is(equalTo(null)));
}
}

View File

@ -1824,7 +1824,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-maven-plugin</artifactId>
<version>1.18</version>
<version>1.19</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>