Ongoing work on subscriptions

This commit is contained in:
James Agnew 2017-09-09 09:53:51 -07:00
parent 836d4d051b
commit 736e037b1a
13 changed files with 635 additions and 100 deletions

View File

@ -214,6 +214,12 @@ public class ValidatorExamples {
return null; return null;
} }
@Override
public List<IBaseResource> fetchAllConformanceResources(FhirContext theContext) {
// TODO: implement
return null;
}
@Override @Override
public List<StructureDefinition> fetchAllStructureDefinitions(FhirContext theContext) { public List<StructureDefinition> fetchAllStructureDefinitions(FhirContext theContext) {
// TODO: implement // TODO: implement

View File

@ -165,11 +165,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao {
@Autowired @Autowired
private IResourceIndexedCompositeStringUniqueDao myResourceIndexedCompositeStringUniqueDao; private IResourceIndexedCompositeStringUniqueDao myResourceIndexedCompositeStringUniqueDao;
private <T extends IBaseResource> void autoCreateResource(T theResource) {
IFhirResourceDao<T> dao = (IFhirResourceDao<T>) getDao(theResource.getClass());
dao.create(theResource);
}
protected void clearRequestAsProcessingSubRequest(ServletRequestDetails theRequestDetails) { protected void clearRequestAsProcessingSubRequest(ServletRequestDetails theRequestDetails) {
if (theRequestDetails != null) { if (theRequestDetails != null) {
theRequestDetails.getUserData().remove(PROCESSING_SUB_REQUEST); theRequestDetails.getUserData().remove(PROCESSING_SUB_REQUEST);
@ -423,11 +418,13 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao {
if (getConfig().isAutoCreatePlaceholderReferenceTargets()) { if (getConfig().isAutoCreatePlaceholderReferenceTargets()) {
IBaseResource newResource = missingResourceDef.newInstance(); IBaseResource newResource = missingResourceDef.newInstance();
newResource.setId(resName + "/" + id); newResource.setId(resName + "/" + id);
autoCreateResource(newResource); IFhirResourceDao<IBaseResource> placeholderResourceDao = (IFhirResourceDao<IBaseResource>) getDao(newResource.getClass());
} ourLog.info("Automatically creating empty placeholder resource: {}", newResource.getIdElement().getValue());
valueOf = placeholderResourceDao.update(newResource).getEntity().getId();
} else {
throw new InvalidRequestException("Resource " + resName + "/" + id + " not found, specified in path: " + nextPathsUnsplit); throw new InvalidRequestException("Resource " + resName + "/" + id + " not found, specified in path: " + nextPathsUnsplit);
} }
}
ResourceTable target = myEntityManager.find(ResourceTable.class, valueOf); ResourceTable target = myEntityManager.find(ResourceTable.class, valueOf);
RuntimeResourceDefinition targetResourceDef = getContext().getResourceDefinition(type); RuntimeResourceDefinition targetResourceDef = getContext().getResourceDefinition(type);
if (target == null) { if (target == null) {

View File

@ -662,6 +662,7 @@ public class FhirSystemDaoDstu3 extends BaseHapiFhirSystemDao<Bundle, Meta> {
String nextReplacementIdPart = nextReplacementId.getValueAsString(); String nextReplacementIdPart = nextReplacementId.getValueAsString();
if (nextTemporaryId.isUrn() && nextTemporaryIdPart.length() > IdType.URN_PREFIX.length()) { if (nextTemporaryId.isUrn() && nextTemporaryIdPart.length() > IdType.URN_PREFIX.length()) {
matchUrl = matchUrl.replace(nextTemporaryIdPart, nextReplacementIdPart); matchUrl = matchUrl.replace(nextTemporaryIdPart, nextReplacementIdPart);
matchUrl = matchUrl.replace(UrlUtil.escape(nextTemporaryIdPart), nextReplacementIdPart);
} }
} }
} }

View File

@ -662,6 +662,7 @@ public class FhirSystemDaoR4 extends BaseHapiFhirSystemDao<Bundle, Meta> {
String nextReplacementIdPart = nextReplacementId.getValueAsString(); String nextReplacementIdPart = nextReplacementId.getValueAsString();
if (nextTemporaryId.isUrn() && nextTemporaryIdPart.length() > IdType.URN_PREFIX.length()) { if (nextTemporaryId.isUrn() && nextTemporaryIdPart.length() > IdType.URN_PREFIX.length()) {
matchUrl = matchUrl.replace(nextTemporaryIdPart, nextReplacementIdPart); matchUrl = matchUrl.replace(nextTemporaryIdPart, nextReplacementIdPart);
matchUrl = matchUrl.replace(UrlUtil.escape(nextTemporaryIdPart), nextReplacementIdPart);
} }
} }
} }

View File

@ -46,10 +46,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.Enumeration; import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*; import java.util.concurrent.*;
public abstract class BaseSubscriptionInterceptor extends ServerOperationInterceptorAdapter { public abstract class BaseSubscriptionInterceptor extends ServerOperationInterceptorAdapter {
@ -63,13 +60,15 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000; private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000;
private SubscribableChannel myProcessingChannel; private SubscribableChannel myProcessingChannel;
private SubscribableChannel myDeliveryChannel; private SubscribableChannel myDeliveryChannel;
private ExecutorService myExecutor; private ExecutorService myProcessingExecutor;
private int myExecutorThreadCount; private int myExecutorThreadCount;
private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber; private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
private MessageHandler mySubscriptionCheckingSubscriber; private MessageHandler mySubscriptionCheckingSubscriber;
private ConcurrentHashMap<String, IBaseResource> myIdToSubscription = new ConcurrentHashMap<>(); private ConcurrentHashMap<String, IBaseResource> myIdToSubscription = new ConcurrentHashMap<>();
private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class); private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
private BlockingQueue<Runnable> myExecutorQueue; private ThreadPoolExecutor myDeliveryExecutor;
private LinkedBlockingQueue<Runnable> myProcessingExecutorQueue;
private LinkedBlockingQueue<Runnable> myDeliveryExecutorQueue;
/** /**
* Constructor * Constructor
@ -89,8 +88,8 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
myDeliveryChannel = theDeliveryChannel; myDeliveryChannel = theDeliveryChannel;
} }
public BlockingQueue<Runnable> getExecutorQueueForUnitTests() { public int getExecutorQueueSizeForUnitTests() {
return myExecutorQueue; return myProcessingExecutorQueue.size() + myDeliveryExecutorQueue.size();
} }
public int getExecutorThreadCount() { public int getExecutorThreadCount() {
@ -157,15 +156,16 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
@PostConstruct @PostConstruct
public void postConstruct() { public void postConstruct() {
myExecutorQueue = new LinkedBlockingQueue<>(1000); {
myProcessingExecutorQueue = new LinkedBlockingQueue<>(1000);
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override @Override
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) { public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myExecutorQueue.size()); ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myProcessingExecutorQueue.size());
StopWatch sw = new StopWatch(); StopWatch sw = new StopWatch();
try { try {
myExecutorQueue.put(theRunnable); myProcessingExecutorQueue.put(theRunnable);
} catch (InterruptedException theE) { } catch (InterruptedException theE) {
throw new RejectedExecutionException("Task " + theRunnable.toString() + throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString()); " rejected from " + theE.toString());
@ -174,25 +174,55 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
} }
}; };
ThreadFactory threadFactory = new BasicThreadFactory.Builder() ThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("subscription-%d") .namingPattern("subscription-proc-%d")
.daemon(false) .daemon(false)
.priority(Thread.NORM_PRIORITY) .priority(Thread.NORM_PRIORITY)
.build(); .build();
myExecutor = new ThreadPoolExecutor( myProcessingExecutor = new ThreadPoolExecutor(
1, 1,
getExecutorThreadCount(), getExecutorThreadCount(),
0L, 0L,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
myExecutorQueue, myProcessingExecutorQueue,
threadFactory, threadFactory,
rejectedExecutionHandler); rejectedExecutionHandler);
}
{
myDeliveryExecutorQueue = new LinkedBlockingQueue<>(1000);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("subscription-delivery-%d")
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
RejectedExecutionHandler rejectedExecutionHandler2 = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myDeliveryExecutorQueue.size());
StopWatch sw = new StopWatch();
try {
myDeliveryExecutorQueue.put(theRunnable);
} catch (InterruptedException theE) {
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
}
};
myDeliveryExecutor = new ThreadPoolExecutor(
1,
getExecutorThreadCount(),
0L,
TimeUnit.MILLISECONDS,
myDeliveryExecutorQueue,
threadFactory,
rejectedExecutionHandler2);
}
if (getProcessingChannel() == null) { if (getProcessingChannel() == null) {
setProcessingChannel(new ExecutorSubscribableChannel(myExecutor)); setProcessingChannel(new ExecutorSubscribableChannel(myProcessingExecutor));
} }
if (getDeliveryChannel() == null) { if (getDeliveryChannel() == null) {
setDeliveryChannel(new ExecutorSubscribableChannel(myExecutor)); setDeliveryChannel(new ExecutorSubscribableChannel(myDeliveryExecutor));
} }
if (mySubscriptionActivatingSubscriber == null) { if (mySubscriptionActivatingSubscriber == null) {

View File

@ -21,12 +21,14 @@ package ca.uhn.fhir.jpa.subscription;
*/ */
import ca.uhn.fhir.jpa.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.client.api.IGenericClient; import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.ServerValidationModeEnum; import ca.uhn.fhir.rest.client.api.ServerValidationModeEnum;
import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor; import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor;
import ca.uhn.fhir.rest.gclient.IClientExecutable; import ca.uhn.fhir.rest.gclient.IClientExecutable;
import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Subscription; import org.hl7.fhir.r4.model.Subscription;
@ -78,7 +80,7 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionSu
if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) { if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) {
return; return;
} }
try {
ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
if (!subscriptionTypeApplies(getContext(), msg.getSubscription())) { if (!subscriptionTypeApplies(getContext(), msg.getSubscription())) {
@ -90,11 +92,12 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionSu
// Grab the endpoint from the subscription // Grab the endpoint from the subscription
IPrimitiveType<?> endpoint = getContext().newTerser().getSingleValueOrNull(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_ENDPOINT, IPrimitiveType.class); IPrimitiveType<?> endpoint = getContext().newTerser().getSingleValueOrNull(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_ENDPOINT, IPrimitiveType.class);
String endpointUrl = endpoint.getValueAsString(); String endpointUrl = endpoint != null ? endpoint.getValueAsString() : null;
// Grab the payload type (encoding mimetype) from the subscription // Grab the payload type (encoding mimetype) from the subscription
IPrimitiveType<?> payload = getContext().newTerser().getSingleValueOrNull(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_PAYLOAD, IPrimitiveType.class); IPrimitiveType<?> payload = getContext().newTerser().getSingleValueOrNull(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_PAYLOAD, IPrimitiveType.class);
String payloadString = payload.getValueAsString(); String payloadString = payload != null ? payload.getValueAsString() : null;
payloadString = StringUtils.defaultString(payloadString, Constants.CT_FHIR_XML_NEW);
if (payloadString.contains(";")) { if (payloadString.contains(";")) {
payloadString = payloadString.substring(0, payloadString.indexOf(';')); payloadString = payloadString.substring(0, payloadString.indexOf(';'));
} }
@ -104,7 +107,9 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionSu
// Create the client request // Create the client request
getContext().getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER); getContext().getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER);
IGenericClient client = getContext().newRestfulGenericClient(endpointUrl); IGenericClient client = null;
if (isNotBlank(endpointUrl)) {
client = getContext().newRestfulGenericClient(endpointUrl);
// Additional headers specified in the subscription // Additional headers specified in the subscription
List<IPrimitiveType> headers = getContext().newTerser().getValues(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_HEADER, IPrimitiveType.class); List<IPrimitiveType> headers = getContext().newTerser().getValues(subscription, BaseSubscriptionInterceptor.SUBSCRIPTION_HEADER, IPrimitiveType.class);
@ -113,9 +118,13 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionSu
client.registerInterceptor(new SimpleRequestHeaderInterceptor(next.getValueAsString())); client.registerInterceptor(new SimpleRequestHeaderInterceptor(next.getValueAsString()));
} }
} }
}
deliverPayload(msg, subscription, payloadType, client); deliverPayload(msg, subscription, payloadType, client);
} catch (Exception e) {
ourLog.error("Failure handling subscription payload", e);
throw new MessagingException(theMessage, "Failure handling subscription payload", e);
}
} }
} }

View File

@ -57,6 +57,9 @@ public abstract class BaseJpaDstu3Test extends BaseJpaTest {
private static JpaValidationSupportChainDstu3 ourJpaValidationSupportChainDstu3; private static JpaValidationSupportChainDstu3 ourJpaValidationSupportChainDstu3;
private static IFhirResourceDaoValueSet<ValueSet, Coding, CodeableConcept> ourValueSetDao; private static IFhirResourceDaoValueSet<ValueSet, Coding, CodeableConcept> ourValueSetDao;
@Autowired
@Qualifier("myCoverageDaoDstu3")
protected IFhirResourceDao<Coverage> myCoverageDao;
@Autowired @Autowired
protected IResourceIndexedCompositeStringUniqueDao myResourceIndexedCompositeStringUniqueDao; protected IResourceIndexedCompositeStringUniqueDao myResourceIndexedCompositeStringUniqueDao;
@Autowired @Autowired

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.util.JpaConstants;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.DateParam; import ca.uhn.fhir.rest.param.DateParam;
import ca.uhn.fhir.rest.param.TokenParam; import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.util.TestUtil; import ca.uhn.fhir.util.TestUtil;
import org.hl7.fhir.dstu3.model.*; import org.hl7.fhir.dstu3.model.*;
import org.hl7.fhir.dstu3.model.Enumerations.PublicationStatus; import org.hl7.fhir.dstu3.model.Enumerations.PublicationStatus;
@ -36,6 +37,185 @@ public class FhirResourceDaoDstu3UniqueSearchParamTest extends BaseJpaDstu3Test
myDaoConfig.setDefaultSearchParamsCanBeOverridden(new DaoConfig().isDefaultSearchParamsCanBeOverridden()); myDaoConfig.setDefaultSearchParamsCanBeOverridden(new DaoConfig().isDefaultSearchParamsCanBeOverridden());
} }
@Test
public void testIndexTransactionWithMatchUrl() {
Patient pt2 = new Patient();
pt2.setGender(Enumerations.AdministrativeGender.MALE);
pt2.setBirthDateElement(new DateType("2011-01-02"));
IIdType id2 = myPatientDao.create(pt2).getId().toUnqualifiedVersionless();
Coverage cov = new Coverage();
cov.getBeneficiary().setReference(id2.getValue());
cov.addIdentifier().setSystem("urn:foo:bar").setValue("123");
IIdType id3 = myCoverageDao.create(cov).getId().toUnqualifiedVersionless();
createUniqueIndexCoverageBeneficiary();
mySystemDao.markAllResourcesForReindexing();
mySystemDao.performReindexingPass(1000);
List<ResourceIndexedCompositeStringUnique> uniques = myResourceIndexedCompositeStringUniqueDao.findAll();
assertEquals(uniques.toString(), 1, uniques.size());
assertEquals("Coverage/" + id3.getIdPart(), uniques.get(0).getResource().getIdDt().toUnqualifiedVersionless().getValue());
assertEquals("Coverage?beneficiary=Patient%2F" + id2.getIdPart() + "&identifier=urn%3Afoo%3Abar%7C123", uniques.get(0).getIndexString());
}
@Test
public void testIndexTransactionWithMatchUrl2() {
createUniqueIndexCoverageBeneficiary();
String input = "{\n" +
" \"resourceType\": \"Bundle\",\n" +
" \"type\": \"transaction\",\n" +
" \"entry\": [\n" +
" {\n" +
" \"fullUrl\": \"urn:uuid:d2a46176-8e15-405d-bbda-baea1a9dc7f3\",\n" +
" \"resource\": {\n" +
" \"resourceType\": \"Patient\",\n" +
" \"identifier\": [\n" +
" {\n" +
" \"use\": \"official\",\n" +
" \"type\": {\n" +
" \"coding\": [\n" +
" {\n" +
" \"system\": \"http://hl7.org/fhir/v2/0203\",\n" +
" \"code\": \"MR\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"system\": \"FOOORG:FOOSITE:patientid:MR:R\",\n" +
" \"value\": \"007811959\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"request\": {\n" +
" \"method\": \"PUT\",\n" +
" \"url\": \"/Patient?identifier=FOOORG%3AFOOSITE%3Apatientid%3AMR%3AR%7C007811959%2CFOOORG%3AFOOSITE%3Apatientid%3AMR%3AB%7C000929990%2CFOOORG%3AFOOSITE%3Apatientid%3API%3APH%7C00589363%2Chttp%3A%2F%2Fhl7.org%2Ffhir%2Fsid%2Fus-ssn%7C657-01-8133\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"fullUrl\": \"urn:uuid:b58ff639-11d1-4dac-942f-abf4f9a625d7\",\n" +
" \"resource\": {\n" +
" \"resourceType\": \"Coverage\",\n" +
" \"identifier\": [\n" +
" {\n" +
" \"system\": \"FOOORG:FOOSITE:coverage:planId\",\n" +
" \"value\": \"0403-010101\"\n" +
" }\n" +
" ],\n" +
" \"beneficiary\": {\n" +
" \"reference\": \"urn:uuid:d2a46176-8e15-405d-bbda-baea1a9dc7f3\"\n" +
" }\n" +
" },\n" +
" \"request\": {\n" +
" \"method\": \"PUT\",\n" +
" \"url\": \"/Coverage?beneficiary=urn%3Auuid%3Ad2a46176-8e15-405d-bbda-baea1a9dc7f3&identifier=FOOORG%3AFOOSITE%3Acoverage%3AplanId%7C0403-010101\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"fullUrl\": \"urn:uuid:13f5da1a-6601-4c1a-82c9-41527be23fa0\",\n" +
" \"resource\": {\n" +
" \"resourceType\": \"Coverage\",\n" +
" \"contained\": [\n" +
" {\n" +
" \"resourceType\": \"RelatedPerson\",\n" +
" \"id\": \"1\",\n" +
" \"name\": [\n" +
" {\n" +
" \"family\": \"SMITH\",\n" +
" \"given\": [\n" +
" \"FAKER\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"resourceType\": \"Organization\",\n" +
" \"id\": \"2\",\n" +
" \"name\": \"MEDICAID\"\n" +
" }\n" +
" ],\n" +
" \"identifier\": [\n" +
" {\n" +
" \"system\": \"FOOORG:FOOSITE:coverage:planId\",\n" +
" \"value\": \"0404-010101\"\n" +
" }\n" +
" ],\n" +
" \"policyHolder\": {\n" +
" \"reference\": \"#1\"\n" +
" },\n" +
" \"beneficiary\": {\n" +
" \"reference\": \"urn:uuid:d2a46176-8e15-405d-bbda-baea1a9dc7f3\"\n" +
" },\n" +
" \"payor\": [\n" +
" {\n" +
" \"reference\": \"#2\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"request\": {\n" +
" \"method\": \"PUT\",\n" +
" \"url\": \"/Coverage?beneficiary=urn%3Auuid%3Ad2a46176-8e15-405d-bbda-baea1a9dc7f3&identifier=FOOORG%3AFOOSITE%3Acoverage%3AplanId%7C0404-010101\"\n" +
" }\n" +
" }\n" +
" ]\n" +
"}";
Bundle inputBundle = myFhirCtx.newJsonParser().parseResource(Bundle.class, input);
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(inputBundle));
mySystemDao.transaction(mySrd, inputBundle);
inputBundle = myFhirCtx.newJsonParser().parseResource(Bundle.class, input);
mySystemDao.transaction(mySrd, inputBundle);
}
private void createUniqueIndexCoverageBeneficiary() {
SearchParameter sp = new SearchParameter();
sp.setId("SearchParameter/coverage-beneficiary");
sp.setCode("beneficiary");
sp.setExpression("Coverage.beneficiary");
sp.setType(Enumerations.SearchParamType.REFERENCE);
sp.setStatus(PublicationStatus.ACTIVE);
sp.addBase("Coverage");
mySearchParameterDao.update(sp);
sp = new SearchParameter();
sp.setId("SearchParameter/coverage-identifier");
sp.setCode("identifier");
sp.setExpression("Coverage.identifier");
sp.setType(Enumerations.SearchParamType.TOKEN);
sp.setStatus(PublicationStatus.ACTIVE);
sp.addBase("Coverage");
mySearchParameterDao.update(sp);
sp = new SearchParameter();
sp.setId("SearchParameter/coverage-beneficiary-identifier");
sp.setCode("coverage-beneficiary-identifier");
sp.setExpression("Coverage.beneficiary");
sp.setType(Enumerations.SearchParamType.COMPOSITE);
sp.setStatus(PublicationStatus.ACTIVE);
sp.addBase("Coverage");
sp.addComponent()
.setExpression("Coverage")
.setDefinition(new Reference("/SearchParameter/coverage-beneficiary"));
sp.addComponent()
.setExpression("Coverage")
.setDefinition(new Reference("/SearchParameter/coverage-identifier"));
sp.addExtension()
.setUrl(JpaConstants.EXT_SP_UNIQUE)
.setValue(new BooleanType(true));
mySearchParameterDao.update(sp);
mySearchParamRegsitry.forceRefresh();
}
@Before @Before
public void before() { public void before() {
myDaoConfig.setDefaultSearchParamsCanBeOverridden(true); myDaoConfig.setDefaultSearchParamsCanBeOverridden(true);
@ -143,7 +323,7 @@ public class FhirResourceDaoDstu3UniqueSearchParamTest extends BaseJpaDstu3Test
try { try {
myPatientDao.create(pt1).getId().toUnqualifiedVersionless(); myPatientDao.create(pt1).getId().toUnqualifiedVersionless();
fail(); fail();
} catch (JpaSystemException e) { } catch (PreconditionFailedException e) {
// good // good
} }

View File

@ -146,6 +146,9 @@ public abstract class BaseJpaR4Test extends BaseJpaTest {
@Qualifier("myPatientDaoR4") @Qualifier("myPatientDaoR4")
protected IFhirResourceDaoPatient<Patient> myPatientDao; protected IFhirResourceDaoPatient<Patient> myPatientDao;
@Autowired @Autowired
@Qualifier("myCoverageDaoR4")
protected IFhirResourceDao<Coverage> myCoverageDao;
@Autowired
@Qualifier("myPractitionerDaoR4") @Qualifier("myPractitionerDaoR4")
protected IFhirResourceDao<Practitioner> myPractitionerDao; protected IFhirResourceDao<Practitioner> myPractitionerDao;
@Autowired @Autowired

View File

@ -83,12 +83,32 @@ public class FhirResourceDaoCreatePlaceholdersR4Test extends BaseJpaR4Test {
Observation o = new Observation(); Observation o = new Observation();
o.setStatus(ObservationStatus.FINAL); o.setStatus(ObservationStatus.FINAL);
o.getSubject().setReference("Patient/FOO"); o.getSubject().setReference("Patient/FOO");
try {
myObservationDao.create(o, mySrd); myObservationDao.create(o, mySrd);
fail();
} catch (InvalidRequestException e) {
assertEquals("Resource Patient/FOO not found, specified in path: Observation.subject", e.getMessage());
} }
@Test
public void testCreateWithMultiplePlaceholders() {
myDaoConfig.setAutoCreatePlaceholderReferenceTargets(true);
Task task = new Task();
task.addNote().setText("A note");
task.addPartOf().setReference("Task/AAA");
task.addPartOf().setReference("Task/AAA");
task.addPartOf().setReference("Task/AAA");
IIdType id = myTaskDao.create(task).getId().toUnqualifiedVersionless();
task = myTaskDao.read(id);
assertEquals(3, task.getPartOf().size());
assertEquals("Task/AAA", task.getPartOf().get(0).getReference());
assertEquals("Task/AAA", task.getPartOf().get(1).getReference());
assertEquals("Task/AAA", task.getPartOf().get(2).getReference());
SearchParameterMap params = new SearchParameterMap();
params.add(Task.SP_PART_OF, new ReferenceParam("Task/AAA"));
List<String> found = toUnqualifiedVersionlessIdValues(myTaskDao.search(params));
assertThat(found, contains(id.getValue()));
} }
@Test @Test
@ -123,12 +143,7 @@ public class FhirResourceDaoCreatePlaceholdersR4Test extends BaseJpaR4Test {
o.setId(id); o.setId(id);
o.setStatus(ObservationStatus.FINAL); o.setStatus(ObservationStatus.FINAL);
o.getSubject().setReference("Patient/FOO"); o.getSubject().setReference("Patient/FOO");
try {
myObservationDao.update(o, mySrd); myObservationDao.update(o, mySrd);
fail();
} catch (InvalidRequestException e) {
assertEquals("Resource Patient/FOO not found, specified in path: Observation.subject", e.getMessage());
}
} }
@AfterClass @AfterClass

View File

@ -22,6 +22,7 @@ import org.springframework.orm.jpa.JpaSystemException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID;
import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
@ -346,6 +347,182 @@ public class FhirResourceDaoR4UniqueSearchParamTest extends BaseJpaR4Test {
} }
private void createUniqueIndexCoverageBeneficiary() {
SearchParameter sp = new SearchParameter();
sp.setId("SearchParameter/coverage-beneficiary");
sp.setCode("beneficiary");
sp.setExpression("Coverage.beneficiary");
sp.setType(Enumerations.SearchParamType.REFERENCE);
sp.setStatus(PublicationStatus.ACTIVE);
sp.addBase("Coverage");
mySearchParameterDao.update(sp);
sp = new SearchParameter();
sp.setId("SearchParameter/coverage-identifier");
sp.setCode("identifier");
sp.setExpression("Coverage.identifier");
sp.setType(Enumerations.SearchParamType.TOKEN);
sp.setStatus(PublicationStatus.ACTIVE);
sp.addBase("Coverage");
mySearchParameterDao.update(sp);
sp = new SearchParameter();
sp.setId("SearchParameter/coverage-beneficiary-identifier");
sp.setCode("coverage-beneficiary-identifier");
sp.setExpression("Coverage.beneficiary");
sp.setType(Enumerations.SearchParamType.COMPOSITE);
sp.setStatus(PublicationStatus.ACTIVE);
sp.addBase("Coverage");
sp.addComponent()
.setExpression("Coverage")
.setDefinition(new Reference("/SearchParameter/coverage-beneficiary"));
sp.addComponent()
.setExpression("Coverage")
.setDefinition(new Reference("/SearchParameter/coverage-identifier"));
sp.addExtension()
.setUrl(JpaConstants.EXT_SP_UNIQUE)
.setValue(new BooleanType(true));
mySearchParameterDao.update(sp);
mySearchParamRegsitry.forceRefresh();
}
@Test
public void testIndexTransactionWithMatchUrl() {
Patient pt2 = new Patient();
pt2.setGender(Enumerations.AdministrativeGender.MALE);
pt2.setBirthDateElement(new DateType("2011-01-02"));
IIdType id2 = myPatientDao.create(pt2).getId().toUnqualifiedVersionless();
Coverage cov = new Coverage();
cov.getBeneficiary().setReference(id2.getValue());
cov.addIdentifier().setSystem("urn:foo:bar").setValue("123");
IIdType id3 = myCoverageDao.create(cov).getId().toUnqualifiedVersionless();
createUniqueIndexCoverageBeneficiary();
mySystemDao.markAllResourcesForReindexing();
mySystemDao.performReindexingPass(1000);
List<ResourceIndexedCompositeStringUnique> uniques = myResourceIndexedCompositeStringUniqueDao.findAll();
assertEquals(uniques.toString(), 1, uniques.size());
assertEquals("Coverage/" + id3.getIdPart(), uniques.get(0).getResource().getIdDt().toUnqualifiedVersionless().getValue());
assertEquals("Coverage?beneficiary=Patient%2F" + id2.getIdPart() + "&identifier=urn%3Afoo%3Abar%7C123", uniques.get(0).getIndexString());
}
@Test
public void testIndexTransactionWithMatchUrl2() {
createUniqueIndexCoverageBeneficiary();
String input = "{\n" +
" \"resourceType\": \"Bundle\",\n" +
" \"type\": \"transaction\",\n" +
" \"entry\": [\n" +
" {\n" +
" \"fullUrl\": \"urn:uuid:d2a46176-8e15-405d-bbda-baea1a9dc7f3\",\n" +
" \"resource\": {\n" +
" \"resourceType\": \"Patient\",\n" +
" \"identifier\": [\n" +
" {\n" +
" \"use\": \"official\",\n" +
" \"type\": {\n" +
" \"coding\": [\n" +
" {\n" +
" \"system\": \"http://hl7.org/fhir/v2/0203\",\n" +
" \"code\": \"MR\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"system\": \"FOOORG:FOOSITE:patientid:MR:R\",\n" +
" \"value\": \"007811959\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"request\": {\n" +
" \"method\": \"PUT\",\n" +
" \"url\": \"/Patient?identifier=FOOORG%3AFOOSITE%3Apatientid%3AMR%3AR%7C007811959%2CFOOORG%3AFOOSITE%3Apatientid%3AMR%3AB%7C000929990%2CFOOORG%3AFOOSITE%3Apatientid%3API%3APH%7C00589363%2Chttp%3A%2F%2Fhl7.org%2Ffhir%2Fsid%2Fus-ssn%7C657-01-8133\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"fullUrl\": \"urn:uuid:b58ff639-11d1-4dac-942f-abf4f9a625d7\",\n" +
" \"resource\": {\n" +
" \"resourceType\": \"Coverage\",\n" +
" \"identifier\": [\n" +
" {\n" +
" \"system\": \"FOOORG:FOOSITE:coverage:planId\",\n" +
" \"value\": \"0403-010101\"\n" +
" }\n" +
" ],\n" +
" \"beneficiary\": {\n" +
" \"reference\": \"urn:uuid:d2a46176-8e15-405d-bbda-baea1a9dc7f3\"\n" +
" }\n" +
" },\n" +
" \"request\": {\n" +
" \"method\": \"PUT\",\n" +
" \"url\": \"/Coverage?beneficiary=urn%3Auuid%3Ad2a46176-8e15-405d-bbda-baea1a9dc7f3&identifier=FOOORG%3AFOOSITE%3Acoverage%3AplanId%7C0403-010101\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"fullUrl\": \"urn:uuid:13f5da1a-6601-4c1a-82c9-41527be23fa0\",\n" +
" \"resource\": {\n" +
" \"resourceType\": \"Coverage\",\n" +
" \"contained\": [\n" +
" {\n" +
" \"resourceType\": \"RelatedPerson\",\n" +
" \"id\": \"1\",\n" +
" \"name\": [\n" +
" {\n" +
" \"family\": \"SMITH\",\n" +
" \"given\": [\n" +
" \"FAKER\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"resourceType\": \"Organization\",\n" +
" \"id\": \"2\",\n" +
" \"name\": \"MEDICAID\"\n" +
" }\n" +
" ],\n" +
" \"identifier\": [\n" +
" {\n" +
" \"system\": \"FOOORG:FOOSITE:coverage:planId\",\n" +
" \"value\": \"0404-010101\"\n" +
" }\n" +
" ],\n" +
" \"policyHolder\": {\n" +
" \"reference\": \"#1\"\n" +
" },\n" +
" \"beneficiary\": {\n" +
" \"reference\": \"urn:uuid:d2a46176-8e15-405d-bbda-baea1a9dc7f3\"\n" +
" },\n" +
" \"payor\": [\n" +
" {\n" +
" \"reference\": \"#2\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"request\": {\n" +
" \"method\": \"PUT\",\n" +
" \"url\": \"/Coverage?beneficiary=urn%3Auuid%3Ad2a46176-8e15-405d-bbda-baea1a9dc7f3&identifier=FOOORG%3AFOOSITE%3Acoverage%3AplanId%7C0404-010101\"\n" +
" }\n" +
" }\n" +
" ]\n" +
"}";
Bundle inputBundle = myFhirCtx.newJsonParser().parseResource(Bundle.class, input);
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(inputBundle));
mySystemDao.transaction(mySrd, inputBundle);
inputBundle = myFhirCtx.newJsonParser().parseResource(Bundle.class, input);
mySystemDao.transaction(mySrd, inputBundle);
}
@Test @Test
public void testUniqueValuesAreIndexed_DateAndToken() { public void testUniqueValuesAreIndexed_DateAndToken() {
createUniqueBirthdateAndGenderSps(); createUniqueBirthdateAndGenderSps();
@ -423,6 +600,119 @@ public class FhirResourceDaoR4UniqueSearchParamTest extends BaseJpaR4Test {
assertEquals("Patient?name=GIVEN2&organization=Organization%2FORG", uniques.get(2).getIndexString()); assertEquals("Patient?name=GIVEN2&organization=Organization%2FORG", uniques.get(2).getIndexString());
} }
@Test
public void testUniqueValuesAreIndexed_StringAndReference_UsingConditional() {
createUniqueNameAndManagingOrganizationSps();
List<ResourceIndexedCompositeStringUnique> uniques;
Organization org = new Organization();
org.setId("Organization/ORG");
org.setName("ORG");
myOrganizationDao.update(org);
Patient pt1 = new Patient();
pt1.addName().setFamily("FAMILY1");
pt1.setManagingOrganization(new Reference("Organization/ORG"));
IIdType id1 = myPatientDao.update(pt1, "Patient?name=FAMILY1&organization.name=ORG").getId().toUnqualifiedVersionless();
uniques = myResourceIndexedCompositeStringUniqueDao.findAll();
assertEquals(1, uniques.size());
assertEquals("Patient/" + id1.getIdPart(), uniques.get(0).getResource().getIdDt().toUnqualifiedVersionless().getValue());
assertEquals("Patient?name=FAMILY1&organization=Organization%2FORG", uniques.get(0).getIndexString());
// Again
pt1 = new Patient();
pt1.addName().setFamily("FAMILY1");
pt1.setManagingOrganization(new Reference("Organization/ORG"));
id1 = myPatientDao.update(pt1, "Patient?name=FAMILY1&organization.name=ORG").getId().toUnqualifiedVersionless();
uniques = myResourceIndexedCompositeStringUniqueDao.findAll();
assertEquals(1, uniques.size());
assertEquals("Patient/" + id1.getIdPart(), uniques.get(0).getResource().getIdDt().toUnqualifiedVersionless().getValue());
assertEquals("Patient?name=FAMILY1&organization=Organization%2FORG", uniques.get(0).getIndexString());
}
@Test
public void testUniqueValuesAreIndexed_StringAndReference_UsingConditionalInTransaction() {
createUniqueNameAndManagingOrganizationSps();
List<ResourceIndexedCompositeStringUnique> uniques;
Organization org = new Organization();
org.setId("Organization/ORG");
org.setName("ORG");
myOrganizationDao.update(org);
Bundle bundle = new Bundle();
bundle.setType(Bundle.BundleType.TRANSACTION);
String orgId = "urn:uuid:" + UUID.randomUUID().toString();
org = new Organization();
org.setName("ORG");
bundle
.addEntry()
.setResource(org)
.setFullUrl(orgId)
.getRequest()
.setMethod(Bundle.HTTPVerb.PUT)
.setUrl("/Organization?name=ORG");
Patient pt1 = new Patient();
pt1.addName().setFamily("FAMILY1");
pt1.setManagingOrganization(new Reference(orgId));
bundle
.addEntry()
.setResource(pt1)
.getRequest()
.setMethod(Bundle.HTTPVerb.PUT)
.setUrl("/Patient?name=FAMILY1&organization=" + orgId.replace(":", "%3A"));
Bundle resp = mySystemDao.transaction(mySrd, bundle);
IIdType id1 = new IdType(resp.getEntry().get(1).getResponse().getLocation());
uniques = myResourceIndexedCompositeStringUniqueDao.findAll();
assertEquals(1, uniques.size());
assertEquals("Patient/" + id1.getIdPart(), uniques.get(0).getResource().getIdDt().toUnqualifiedVersionless().getValue());
assertEquals("Patient?name=FAMILY1&organization=Organization%2FORG", uniques.get(0).getIndexString());
// Again
bundle = new Bundle();
bundle.setType(Bundle.BundleType.TRANSACTION);
orgId = IdType.newRandomUuid().getValue();
org = new Organization();
org.setName("ORG");
bundle
.addEntry()
.setResource(org)
.setFullUrl(orgId)
.getRequest()
.setMethod(Bundle.HTTPVerb.PUT)
.setUrl("/Organization?name=ORG");
pt1 = new Patient();
pt1.addName().setFamily("FAMILY1");
pt1.setManagingOrganization(new Reference(orgId));
bundle
.addEntry()
.setResource(pt1)
.getRequest()
.setMethod(Bundle.HTTPVerb.PUT)
.setUrl("/Patient?name=FAMILY1&organization=" + orgId);
resp = mySystemDao.transaction(mySrd, bundle);
id1 = new IdType(resp.getEntry().get(1).getResponse().getLocation());
uniques = myResourceIndexedCompositeStringUniqueDao.findAll();
assertEquals(1, uniques.size());
assertEquals("Patient/" + id1.getIdPart(), uniques.get(0).getResource().getIdDt().toUnqualifiedVersionless().getValue());
assertEquals("Patient?name=FAMILY1&organization=Organization%2FORG", uniques.get(0).getIndexString());
}
@Test @Test
public void testUniqueValuesAreNotIndexedIfNotAllParamsAreFound_DateAndToken() { public void testUniqueValuesAreNotIndexedIfNotAllParamsAreFound_DateAndToken() {
createUniqueBirthdateAndGenderSps(); createUniqueBirthdateAndGenderSps();

View File

@ -281,11 +281,11 @@ public class RestHookTestDstu2Test extends BaseResourceProviderDstu2Test {
} }
public static void waitForQueueToDrain(BaseSubscriptionInterceptor theRestHookSubscriptionInterceptor) throws InterruptedException { public static void waitForQueueToDrain(BaseSubscriptionInterceptor theRestHookSubscriptionInterceptor) throws InterruptedException {
ourLog.info("QUEUE HAS {} ITEMS", theRestHookSubscriptionInterceptor.getExecutorQueueForUnitTests().size()); ourLog.info("QUEUE HAS {} ITEMS", theRestHookSubscriptionInterceptor.getExecutorQueueSizeForUnitTests());
while (theRestHookSubscriptionInterceptor.getExecutorQueueForUnitTests().size() > 0) { while (theRestHookSubscriptionInterceptor.getExecutorQueueSizeForUnitTests() > 0) {
Thread.sleep(50); Thread.sleep(50);
} }
ourLog.info("QUEUE HAS {} ITEMS", theRestHookSubscriptionInterceptor.getExecutorQueueForUnitTests().size()); ourLog.info("QUEUE HAS {} ITEMS", theRestHookSubscriptionInterceptor.getExecutorQueueSizeForUnitTests());
} }
private void waitForQueueToDrain() throws InterruptedException { private void waitForQueueToDrain() throws InterruptedException {

View File

@ -84,11 +84,11 @@ public class RestHookTestWithInterceptorRegisteredToDaoConfigR4Test extends Base
} }
private void waitForQueueToDrain() throws InterruptedException { private void waitForQueueToDrain() throws InterruptedException {
ourLog.info("QUEUE HAS {} ITEMS", getRestHookSubscriptionInterceptor().getExecutorQueueForUnitTests().size()); ourLog.info("QUEUE HAS {} ITEMS", getRestHookSubscriptionInterceptor().getExecutorQueueSizeForUnitTests());
while (getRestHookSubscriptionInterceptor().getExecutorQueueForUnitTests().size() > 0) { while (getRestHookSubscriptionInterceptor().getExecutorQueueSizeForUnitTests() > 0) {
Thread.sleep(250); Thread.sleep(250);
} }
ourLog.info("QUEUE HAS {} ITEMS", getRestHookSubscriptionInterceptor().getExecutorQueueForUnitTests().size()); ourLog.info("QUEUE HAS {} ITEMS", getRestHookSubscriptionInterceptor().getExecutorQueueSizeForUnitTests());
} }
private Observation sendObservation(String code, String system) throws InterruptedException { private Observation sendObservation(String code, String system) throws InterruptedException {