Avoid issues when using subscription delivery in a serializing/queuing
environment
This commit is contained in:
parent
d0b194f9d7
commit
ce3b7c82ce
|
@ -486,284 +486,293 @@ public class TransactionProcessor<BUNDLE extends IBaseBundle, BUNDLEENTRY> {
|
|||
return myContext.getVersion().newIdType().setValue(theValue);
|
||||
}
|
||||
|
||||
|
||||
private Map<BUNDLEENTRY, ResourceTable> doTransactionWriteOperations(ServletRequestDetails theRequestDetails, String theActionName, Date theUpdateTime, Set<IIdType> theAllIds,
|
||||
Map<IIdType, IIdType> theIdSubstitutions, Map<IIdType, DaoMethodOutcome> theIdToPersistedOutcome, BUNDLE theResponse, IdentityHashMap<BUNDLEENTRY, Integer> theOriginalRequestOrder, List<BUNDLEENTRY> theEntries, StopWatch theTransactionStopWatch) {
|
||||
Set<String> deletedResources = new HashSet<>();
|
||||
List<DeleteConflict> deleteConflicts = new ArrayList<>();
|
||||
Map<BUNDLEENTRY, ResourceTable> entriesToProcess = new IdentityHashMap<>();
|
||||
Set<ResourceTable> nonUpdatedEntities = new HashSet<>();
|
||||
Set<ResourceTable> updatedEntities = new HashSet<>();
|
||||
Map<String, Class<? extends IBaseResource>> conditionalRequestUrls = new HashMap<>();
|
||||
|
||||
/*
|
||||
* Loop through the request and process any entries of type
|
||||
* PUT, POST or DELETE
|
||||
*/
|
||||
for (int i = 0; i < theEntries.size(); i++) {
|
||||
theRequestDetails.startDeferredOperationCallback();
|
||||
try {
|
||||
|
||||
if (i % 100 == 0) {
|
||||
ourLog.debug("Processed {} non-GET entries out of {}", i, theEntries.size());
|
||||
}
|
||||
Set<String> deletedResources = new HashSet<>();
|
||||
List<DeleteConflict> deleteConflicts = new ArrayList<>();
|
||||
Map<BUNDLEENTRY, ResourceTable> entriesToProcess = new IdentityHashMap<>();
|
||||
Set<ResourceTable> nonUpdatedEntities = new HashSet<>();
|
||||
Set<ResourceTable> updatedEntities = new HashSet<>();
|
||||
Map<String, Class<? extends IBaseResource>> conditionalRequestUrls = new HashMap<>();
|
||||
|
||||
BUNDLEENTRY nextReqEntry = theEntries.get(i);
|
||||
IBaseResource res = myVersionAdapter.getResource(nextReqEntry);
|
||||
IIdType nextResourceId = null;
|
||||
if (res != null) {
|
||||
/*
|
||||
* Loop through the request and process any entries of type
|
||||
* PUT, POST or DELETE
|
||||
*/
|
||||
for (int i = 0; i < theEntries.size(); i++) {
|
||||
|
||||
nextResourceId = res.getIdElement();
|
||||
|
||||
if (!nextResourceId.hasIdPart()) {
|
||||
if (isNotBlank(myVersionAdapter.getFullUrl(nextReqEntry))) {
|
||||
nextResourceId = newIdType(myVersionAdapter.getFullUrl(nextReqEntry));
|
||||
}
|
||||
if (i % 100 == 0) {
|
||||
ourLog.debug("Processed {} non-GET entries out of {}", i, theEntries.size());
|
||||
}
|
||||
|
||||
if (nextResourceId.hasIdPart() && nextResourceId.getIdPart().matches("[a-zA-Z]+\\:.*") && !isPlaceholder(nextResourceId)) {
|
||||
throw new InvalidRequestException("Invalid placeholder ID found: " + nextResourceId.getIdPart() + " - Must be of the form 'urn:uuid:[uuid]' or 'urn:oid:[oid]'");
|
||||
}
|
||||
BUNDLEENTRY nextReqEntry = theEntries.get(i);
|
||||
IBaseResource res = myVersionAdapter.getResource(nextReqEntry);
|
||||
IIdType nextResourceId = null;
|
||||
if (res != null) {
|
||||
|
||||
if (nextResourceId.hasIdPart() && !nextResourceId.hasResourceType() && !isPlaceholder(nextResourceId)) {
|
||||
nextResourceId = newIdType(toResourceName(res.getClass()), nextResourceId.getIdPart());
|
||||
res.setId(nextResourceId);
|
||||
}
|
||||
nextResourceId = res.getIdElement();
|
||||
|
||||
/*
|
||||
* Ensure that the bundle doesn't have any duplicates, since this causes all kinds of weirdness
|
||||
*/
|
||||
if (isPlaceholder(nextResourceId)) {
|
||||
if (!theAllIds.add(nextResourceId)) {
|
||||
throw new InvalidRequestException(myContext.getLocalizer().getMessage(BaseHapiFhirSystemDao.class, "transactionContainsMultipleWithDuplicateId", nextResourceId));
|
||||
}
|
||||
} else if (nextResourceId.hasResourceType() && nextResourceId.hasIdPart()) {
|
||||
IIdType nextId = nextResourceId.toUnqualifiedVersionless();
|
||||
if (!theAllIds.add(nextId)) {
|
||||
throw new InvalidRequestException(myContext.getLocalizer().getMessage(BaseHapiFhirSystemDao.class, "transactionContainsMultipleWithDuplicateId", nextId));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
String verb = myVersionAdapter.getEntryRequestVerb(nextReqEntry);
|
||||
String resourceType = res != null ? myContext.getResourceDefinition(res).getName() : null;
|
||||
Integer order = theOriginalRequestOrder.get(nextReqEntry);
|
||||
BUNDLEENTRY nextRespEntry = myVersionAdapter.getEntries(theResponse).get(order);
|
||||
|
||||
theTransactionStopWatch.startTask("Bundle.entry[" + i + "]: " + verb + " " + defaultString(resourceType));
|
||||
|
||||
switch (verb) {
|
||||
case "POST": {
|
||||
// CREATE
|
||||
@SuppressWarnings("rawtypes")
|
||||
IFhirResourceDao resourceDao = getDaoOrThrowException(res.getClass());
|
||||
res.setId((String) null);
|
||||
DaoMethodOutcome outcome;
|
||||
String matchUrl = myVersionAdapter.getEntryRequestIfNoneExist(nextReqEntry);
|
||||
matchUrl = performIdSubstitutionsInMatchUrl(theIdSubstitutions, matchUrl);
|
||||
outcome = resourceDao.create(res, matchUrl, false, theRequestDetails);
|
||||
if (nextResourceId != null) {
|
||||
handleTransactionCreateOrUpdateOutcome(theIdSubstitutions, theIdToPersistedOutcome, nextResourceId, outcome, nextRespEntry, resourceType, res, theRequestDetails);
|
||||
}
|
||||
entriesToProcess.put(nextRespEntry, outcome.getEntity());
|
||||
if (outcome.getCreated() == false) {
|
||||
nonUpdatedEntities.add(outcome.getEntity());
|
||||
} else {
|
||||
if (isNotBlank(matchUrl)) {
|
||||
conditionalRequestUrls.put(matchUrl, res.getClass());
|
||||
if (!nextResourceId.hasIdPart()) {
|
||||
if (isNotBlank(myVersionAdapter.getFullUrl(nextReqEntry))) {
|
||||
nextResourceId = newIdType(myVersionAdapter.getFullUrl(nextReqEntry));
|
||||
}
|
||||
}
|
||||
|
||||
if (nextResourceId.hasIdPart() && nextResourceId.getIdPart().matches("[a-zA-Z]+\\:.*") && !isPlaceholder(nextResourceId)) {
|
||||
throw new InvalidRequestException("Invalid placeholder ID found: " + nextResourceId.getIdPart() + " - Must be of the form 'urn:uuid:[uuid]' or 'urn:oid:[oid]'");
|
||||
}
|
||||
|
||||
if (nextResourceId.hasIdPart() && !nextResourceId.hasResourceType() && !isPlaceholder(nextResourceId)) {
|
||||
nextResourceId = newIdType(toResourceName(res.getClass()), nextResourceId.getIdPart());
|
||||
res.setId(nextResourceId);
|
||||
}
|
||||
|
||||
/*
|
||||
* Ensure that the bundle doesn't have any duplicates, since this causes all kinds of weirdness
|
||||
*/
|
||||
if (isPlaceholder(nextResourceId)) {
|
||||
if (!theAllIds.add(nextResourceId)) {
|
||||
throw new InvalidRequestException(myContext.getLocalizer().getMessage(BaseHapiFhirSystemDao.class, "transactionContainsMultipleWithDuplicateId", nextResourceId));
|
||||
}
|
||||
} else if (nextResourceId.hasResourceType() && nextResourceId.hasIdPart()) {
|
||||
IIdType nextId = nextResourceId.toUnqualifiedVersionless();
|
||||
if (!theAllIds.add(nextId)) {
|
||||
throw new InvalidRequestException(myContext.getLocalizer().getMessage(BaseHapiFhirSystemDao.class, "transactionContainsMultipleWithDuplicateId", nextId));
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case "DELETE": {
|
||||
// DELETE
|
||||
String url = extractTransactionUrlOrThrowException(nextReqEntry, verb);
|
||||
UrlUtil.UrlParts parts = UrlUtil.parseUrl(url);
|
||||
ca.uhn.fhir.jpa.dao.IFhirResourceDao<? extends IBaseResource> dao = toDao(parts, verb, url);
|
||||
int status = Constants.STATUS_HTTP_204_NO_CONTENT;
|
||||
if (parts.getResourceId() != null) {
|
||||
IIdType deleteId = newIdType(parts.getResourceType(), parts.getResourceId());
|
||||
if (!deletedResources.contains(deleteId.getValueAsString())) {
|
||||
DaoMethodOutcome outcome = dao.delete(deleteId, deleteConflicts, theRequestDetails);
|
||||
if (outcome.getEntity() != null) {
|
||||
deletedResources.add(deleteId.getValueAsString());
|
||||
entriesToProcess.put(nextRespEntry, outcome.getEntity());
|
||||
|
||||
String verb = myVersionAdapter.getEntryRequestVerb(nextReqEntry);
|
||||
String resourceType = res != null ? myContext.getResourceDefinition(res).getName() : null;
|
||||
Integer order = theOriginalRequestOrder.get(nextReqEntry);
|
||||
BUNDLEENTRY nextRespEntry = myVersionAdapter.getEntries(theResponse).get(order);
|
||||
|
||||
theTransactionStopWatch.startTask("Bundle.entry[" + i + "]: " + verb + " " + defaultString(resourceType));
|
||||
|
||||
switch (verb) {
|
||||
case "POST": {
|
||||
// CREATE
|
||||
@SuppressWarnings("rawtypes")
|
||||
IFhirResourceDao resourceDao = getDaoOrThrowException(res.getClass());
|
||||
res.setId((String) null);
|
||||
DaoMethodOutcome outcome;
|
||||
String matchUrl = myVersionAdapter.getEntryRequestIfNoneExist(nextReqEntry);
|
||||
matchUrl = performIdSubstitutionsInMatchUrl(theIdSubstitutions, matchUrl);
|
||||
outcome = resourceDao.create(res, matchUrl, false, theRequestDetails);
|
||||
if (nextResourceId != null) {
|
||||
handleTransactionCreateOrUpdateOutcome(theIdSubstitutions, theIdToPersistedOutcome, nextResourceId, outcome, nextRespEntry, resourceType, res, theRequestDetails);
|
||||
}
|
||||
entriesToProcess.put(nextRespEntry, outcome.getEntity());
|
||||
if (outcome.getCreated() == false) {
|
||||
nonUpdatedEntities.add(outcome.getEntity());
|
||||
} else {
|
||||
if (isNotBlank(matchUrl)) {
|
||||
conditionalRequestUrls.put(matchUrl, res.getClass());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
String matchUrl = parts.getResourceType() + '?' + parts.getParams();
|
||||
matchUrl = performIdSubstitutionsInMatchUrl(theIdSubstitutions, matchUrl);
|
||||
DeleteMethodOutcome deleteOutcome = dao.deleteByUrl(matchUrl, deleteConflicts, theRequestDetails);
|
||||
List<ResourceTable> allDeleted = deleteOutcome.getDeletedEntities();
|
||||
for (ResourceTable deleted : allDeleted) {
|
||||
deletedResources.add(deleted.getIdDt().toUnqualifiedVersionless().getValueAsString());
|
||||
}
|
||||
if (allDeleted.isEmpty()) {
|
||||
status = Constants.STATUS_HTTP_204_NO_CONTENT;
|
||||
}
|
||||
|
||||
myVersionAdapter.setResponseOutcome(nextRespEntry, deleteOutcome.getOperationOutcome());
|
||||
break;
|
||||
}
|
||||
|
||||
myVersionAdapter.setResponseStatus(nextRespEntry, toStatusString(status));
|
||||
|
||||
break;
|
||||
}
|
||||
case "PUT": {
|
||||
// UPDATE
|
||||
@SuppressWarnings("rawtypes")
|
||||
IFhirResourceDao resourceDao = getDaoOrThrowException(res.getClass());
|
||||
|
||||
String url = extractTransactionUrlOrThrowException(nextReqEntry, verb);
|
||||
|
||||
DaoMethodOutcome outcome;
|
||||
UrlUtil.UrlParts parts = UrlUtil.parseUrl(url);
|
||||
if (isNotBlank(parts.getResourceId())) {
|
||||
String version = null;
|
||||
if (isNotBlank(myVersionAdapter.getEntryRequestIfMatch(nextReqEntry))) {
|
||||
version = ParameterUtil.parseETagValue(myVersionAdapter.getEntryRequestIfMatch(nextReqEntry));
|
||||
}
|
||||
res.setId(newIdType(parts.getResourceType(), parts.getResourceId(), version));
|
||||
outcome = resourceDao.update(res, null, false, theRequestDetails);
|
||||
} else {
|
||||
res.setId((String) null);
|
||||
String matchUrl;
|
||||
if (isNotBlank(parts.getParams())) {
|
||||
matchUrl = parts.getResourceType() + '?' + parts.getParams();
|
||||
case "DELETE": {
|
||||
// DELETE
|
||||
String url = extractTransactionUrlOrThrowException(nextReqEntry, verb);
|
||||
UrlUtil.UrlParts parts = UrlUtil.parseUrl(url);
|
||||
ca.uhn.fhir.jpa.dao.IFhirResourceDao<? extends IBaseResource> dao = toDao(parts, verb, url);
|
||||
int status = Constants.STATUS_HTTP_204_NO_CONTENT;
|
||||
if (parts.getResourceId() != null) {
|
||||
IIdType deleteId = newIdType(parts.getResourceType(), parts.getResourceId());
|
||||
if (!deletedResources.contains(deleteId.getValueAsString())) {
|
||||
DaoMethodOutcome outcome = dao.delete(deleteId, deleteConflicts, theRequestDetails);
|
||||
if (outcome.getEntity() != null) {
|
||||
deletedResources.add(deleteId.getValueAsString());
|
||||
entriesToProcess.put(nextRespEntry, outcome.getEntity());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
matchUrl = parts.getResourceType();
|
||||
}
|
||||
matchUrl = performIdSubstitutionsInMatchUrl(theIdSubstitutions, matchUrl);
|
||||
outcome = resourceDao.update(res, matchUrl, false, theRequestDetails);
|
||||
if (Boolean.TRUE.equals(outcome.getCreated())) {
|
||||
conditionalRequestUrls.put(matchUrl, res.getClass());
|
||||
}
|
||||
}
|
||||
String matchUrl = parts.getResourceType() + '?' + parts.getParams();
|
||||
matchUrl = performIdSubstitutionsInMatchUrl(theIdSubstitutions, matchUrl);
|
||||
DeleteMethodOutcome deleteOutcome = dao.deleteByUrl(matchUrl, deleteConflicts, theRequestDetails);
|
||||
List<ResourceTable> allDeleted = deleteOutcome.getDeletedEntities();
|
||||
for (ResourceTable deleted : allDeleted) {
|
||||
deletedResources.add(deleted.getIdDt().toUnqualifiedVersionless().getValueAsString());
|
||||
}
|
||||
if (allDeleted.isEmpty()) {
|
||||
status = Constants.STATUS_HTTP_204_NO_CONTENT;
|
||||
}
|
||||
|
||||
if (outcome.getCreated() == Boolean.FALSE) {
|
||||
updatedEntities.add(outcome.getEntity());
|
||||
}
|
||||
myVersionAdapter.setResponseOutcome(nextRespEntry, deleteOutcome.getOperationOutcome());
|
||||
}
|
||||
|
||||
myVersionAdapter.setResponseStatus(nextRespEntry, toStatusString(status));
|
||||
|
||||
break;
|
||||
}
|
||||
case "PUT": {
|
||||
// UPDATE
|
||||
@SuppressWarnings("rawtypes")
|
||||
IFhirResourceDao resourceDao = getDaoOrThrowException(res.getClass());
|
||||
|
||||
String url = extractTransactionUrlOrThrowException(nextReqEntry, verb);
|
||||
|
||||
DaoMethodOutcome outcome;
|
||||
UrlUtil.UrlParts parts = UrlUtil.parseUrl(url);
|
||||
if (isNotBlank(parts.getResourceId())) {
|
||||
String version = null;
|
||||
if (isNotBlank(myVersionAdapter.getEntryRequestIfMatch(nextReqEntry))) {
|
||||
version = ParameterUtil.parseETagValue(myVersionAdapter.getEntryRequestIfMatch(nextReqEntry));
|
||||
}
|
||||
res.setId(newIdType(parts.getResourceType(), parts.getResourceId(), version));
|
||||
outcome = resourceDao.update(res, null, false, theRequestDetails);
|
||||
} else {
|
||||
res.setId((String) null);
|
||||
String matchUrl;
|
||||
if (isNotBlank(parts.getParams())) {
|
||||
matchUrl = parts.getResourceType() + '?' + parts.getParams();
|
||||
} else {
|
||||
matchUrl = parts.getResourceType();
|
||||
}
|
||||
matchUrl = performIdSubstitutionsInMatchUrl(theIdSubstitutions, matchUrl);
|
||||
outcome = resourceDao.update(res, matchUrl, false, theRequestDetails);
|
||||
if (Boolean.TRUE.equals(outcome.getCreated())) {
|
||||
conditionalRequestUrls.put(matchUrl, res.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
if (outcome.getCreated() == Boolean.FALSE) {
|
||||
updatedEntities.add(outcome.getEntity());
|
||||
}
|
||||
|
||||
handleTransactionCreateOrUpdateOutcome(theIdSubstitutions, theIdToPersistedOutcome, nextResourceId, outcome, nextRespEntry, resourceType, res, theRequestDetails);
|
||||
entriesToProcess.put(nextRespEntry, outcome.getEntity());
|
||||
break;
|
||||
}
|
||||
case "GET":
|
||||
default:
|
||||
break;
|
||||
|
||||
handleTransactionCreateOrUpdateOutcome(theIdSubstitutions, theIdToPersistedOutcome, nextResourceId, outcome, nextRespEntry, resourceType, res, theRequestDetails);
|
||||
entriesToProcess.put(nextRespEntry, outcome.getEntity());
|
||||
break;
|
||||
}
|
||||
case "GET":
|
||||
default:
|
||||
break;
|
||||
|
||||
theTransactionStopWatch.endCurrentTask();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Make sure that there are no conflicts from deletions. E.g. we can't delete something
|
||||
* if something else has a reference to it.. Unless the thing that has a reference to it
|
||||
* was also deleted as a part of this transaction, which is why we check this now at the
|
||||
* end.
|
||||
*/
|
||||
|
||||
deleteConflicts.removeIf(next ->
|
||||
deletedResources.contains(next.getTargetId().toUnqualifiedVersionless().getValue()));
|
||||
myDao.validateDeleteConflictsEmptyOrThrowException(deleteConflicts);
|
||||
|
||||
/*
|
||||
* Perform ID substitutions and then index each resource we have saved
|
||||
*/
|
||||
|
||||
FhirTerser terser = myContext.newTerser();
|
||||
theTransactionStopWatch.startTask("Index " + theIdToPersistedOutcome.size() + " resources");
|
||||
for (DaoMethodOutcome nextOutcome : theIdToPersistedOutcome.values()) {
|
||||
IBaseResource nextResource = nextOutcome.getResource();
|
||||
if (nextResource == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// References
|
||||
List<ResourceReferenceInfo> allRefs = terser.getAllResourceReferences(nextResource);
|
||||
for (ResourceReferenceInfo nextRef : allRefs) {
|
||||
IIdType nextId = nextRef.getResourceReference().getReferenceElement();
|
||||
if (!nextId.hasIdPart()) {
|
||||
continue;
|
||||
}
|
||||
if (theIdSubstitutions.containsKey(nextId)) {
|
||||
IIdType newId = theIdSubstitutions.get(nextId);
|
||||
ourLog.debug(" * Replacing resource ref {} with {}", nextId, newId);
|
||||
nextRef.getResourceReference().setReference(newId.getValue());
|
||||
} else if (nextId.getValue().startsWith("urn:")) {
|
||||
throw new InvalidRequestException("Unable to satisfy placeholder ID " + nextId.getValue() + " found in element named '" + nextRef.getName() + "' within resource of type: " + nextResource.getIdElement().getResourceType());
|
||||
} else {
|
||||
ourLog.debug(" * Reference [{}] does not exist in bundle", nextId);
|
||||
}
|
||||
}
|
||||
|
||||
// URIs
|
||||
Class<? extends IPrimitiveType<?>> uriType = (Class<? extends IPrimitiveType<?>>) myContext.getElementDefinition("uri").getImplementingClass();
|
||||
List<? extends IPrimitiveType<?>> allUris = terser.getAllPopulatedChildElementsOfType(nextResource, uriType);
|
||||
for (IPrimitiveType<?> nextRef : allUris) {
|
||||
if (nextRef instanceof IIdType) {
|
||||
continue; // No substitution on the resource ID itself!
|
||||
}
|
||||
IIdType nextUriString = newIdType(nextRef.getValueAsString());
|
||||
if (theIdSubstitutions.containsKey(nextUriString)) {
|
||||
IIdType newId = theIdSubstitutions.get(nextUriString);
|
||||
ourLog.debug(" * Replacing resource ref {} with {}", nextUriString, newId);
|
||||
nextRef.setValueAsString(newId.getValue());
|
||||
} else {
|
||||
ourLog.debug(" * Reference [{}] does not exist in bundle", nextUriString);
|
||||
}
|
||||
}
|
||||
|
||||
IPrimitiveType<Date> deletedInstantOrNull = ResourceMetadataKeyEnum.DELETED_AT.get((IAnyResource) nextResource);
|
||||
Date deletedTimestampOrNull = deletedInstantOrNull != null ? deletedInstantOrNull.getValue() : null;
|
||||
|
||||
if (updatedEntities.contains(nextOutcome.getEntity())) {
|
||||
myDao.updateInternal(theRequestDetails, nextResource, true, false, theRequestDetails, nextOutcome.getEntity(), nextResource.getIdElement(), nextOutcome.getPreviousResource());
|
||||
} else if (!nonUpdatedEntities.contains(nextOutcome.getEntity())) {
|
||||
myDao.updateEntity(theRequestDetails, nextResource, nextOutcome.getEntity(), deletedTimestampOrNull, true, false, theUpdateTime, false, true);
|
||||
}
|
||||
}
|
||||
|
||||
theTransactionStopWatch.endCurrentTask();
|
||||
}
|
||||
theTransactionStopWatch.startTask("Flush writes to database");
|
||||
|
||||
flushJpaSession();
|
||||
|
||||
/*
|
||||
* Make sure that there are no conflicts from deletions. E.g. we can't delete something
|
||||
* if something else has a reference to it.. Unless the thing that has a reference to it
|
||||
* was also deleted as a part of this transaction, which is why we check this now at the
|
||||
* end.
|
||||
*/
|
||||
|
||||
deleteConflicts.removeIf(next ->
|
||||
deletedResources.contains(next.getTargetId().toUnqualifiedVersionless().getValue()));
|
||||
myDao.validateDeleteConflictsEmptyOrThrowException(deleteConflicts);
|
||||
|
||||
/*
|
||||
* Perform ID substitutions and then index each resource we have saved
|
||||
*/
|
||||
|
||||
FhirTerser terser = myContext.newTerser();
|
||||
theTransactionStopWatch.startTask("Index " + theIdToPersistedOutcome.size() + " resources");
|
||||
for (DaoMethodOutcome nextOutcome : theIdToPersistedOutcome.values()) {
|
||||
IBaseResource nextResource = nextOutcome.getResource();
|
||||
if (nextResource == null) {
|
||||
continue;
|
||||
theTransactionStopWatch.endCurrentTask();
|
||||
if (conditionalRequestUrls.size() > 0) {
|
||||
theTransactionStopWatch.startTask("Check for conflicts in conditional resources");
|
||||
}
|
||||
|
||||
// References
|
||||
List<ResourceReferenceInfo> allRefs = terser.getAllResourceReferences(nextResource);
|
||||
for (ResourceReferenceInfo nextRef : allRefs) {
|
||||
IIdType nextId = nextRef.getResourceReference().getReferenceElement();
|
||||
if (!nextId.hasIdPart()) {
|
||||
/*
|
||||
* Double check we didn't allow any duplicates we shouldn't have
|
||||
*/
|
||||
for (Map.Entry<String, Class<? extends IBaseResource>> nextEntry : conditionalRequestUrls.entrySet()) {
|
||||
String matchUrl = nextEntry.getKey();
|
||||
Class<? extends IBaseResource> resType = nextEntry.getValue();
|
||||
if (isNotBlank(matchUrl)) {
|
||||
IFhirResourceDao<?> resourceDao = myDao.getDao(resType);
|
||||
Set<Long> val = resourceDao.processMatchUrl(matchUrl);
|
||||
if (val.size() > 1) {
|
||||
throw new InvalidRequestException(
|
||||
"Unable to process " + theActionName + " - Request would cause multiple resources to match URL: \"" + matchUrl + "\". Does transaction request contain duplicates?");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
theTransactionStopWatch.endCurrentTask();
|
||||
|
||||
for (IIdType next : theAllIds) {
|
||||
IIdType replacement = theIdSubstitutions.get(next);
|
||||
if (replacement == null) {
|
||||
continue;
|
||||
}
|
||||
if (theIdSubstitutions.containsKey(nextId)) {
|
||||
IIdType newId = theIdSubstitutions.get(nextId);
|
||||
ourLog.debug(" * Replacing resource ref {} with {}", nextId, newId);
|
||||
nextRef.getResourceReference().setReference(newId.getValue());
|
||||
} else if (nextId.getValue().startsWith("urn:")) {
|
||||
throw new InvalidRequestException("Unable to satisfy placeholder ID " + nextId.getValue() + " found in element named '" + nextRef.getName() + "' within resource of type: " + nextResource.getIdElement().getResourceType());
|
||||
} else {
|
||||
ourLog.debug(" * Reference [{}] does not exist in bundle", nextId);
|
||||
if (replacement.equals(next)) {
|
||||
continue;
|
||||
}
|
||||
ourLog.debug("Placeholder resource ID \"{}\" was replaced with permanent ID \"{}\"", next, replacement);
|
||||
}
|
||||
return entriesToProcess;
|
||||
|
||||
// URIs
|
||||
Class<? extends IPrimitiveType<?>> uriType = (Class<? extends IPrimitiveType<?>>) myContext.getElementDefinition("uri").getImplementingClass();
|
||||
List<? extends IPrimitiveType<?>> allUris = terser.getAllPopulatedChildElementsOfType(nextResource, uriType);
|
||||
for (IPrimitiveType<?> nextRef : allUris) {
|
||||
if (nextRef instanceof IIdType) {
|
||||
continue; // No substitution on the resource ID itself!
|
||||
}
|
||||
IIdType nextUriString = newIdType(nextRef.getValueAsString());
|
||||
if (theIdSubstitutions.containsKey(nextUriString)) {
|
||||
IIdType newId = theIdSubstitutions.get(nextUriString);
|
||||
ourLog.debug(" * Replacing resource ref {} with {}", nextUriString, newId);
|
||||
nextRef.setValueAsString(newId.getValue());
|
||||
} else {
|
||||
ourLog.debug(" * Reference [{}] does not exist in bundle", nextUriString);
|
||||
}
|
||||
}
|
||||
|
||||
IPrimitiveType<Date> deletedInstantOrNull = ResourceMetadataKeyEnum.DELETED_AT.get((IAnyResource) nextResource);
|
||||
Date deletedTimestampOrNull = deletedInstantOrNull != null ? deletedInstantOrNull.getValue() : null;
|
||||
|
||||
if (updatedEntities.contains(nextOutcome.getEntity())) {
|
||||
myDao.updateInternal(theRequestDetails, nextResource, true, false, theRequestDetails, nextOutcome.getEntity(), nextResource.getIdElement(), nextOutcome.getPreviousResource());
|
||||
} else if (!nonUpdatedEntities.contains(nextOutcome.getEntity())) {
|
||||
myDao.updateEntity(theRequestDetails, nextResource, nextOutcome.getEntity(), deletedTimestampOrNull, true, false, theUpdateTime, false, true);
|
||||
}
|
||||
} finally {
|
||||
theRequestDetails.stopDeferredRequestOperationCallbackAndRunDeferredItems();
|
||||
}
|
||||
|
||||
theTransactionStopWatch.endCurrentTask();
|
||||
theTransactionStopWatch.startTask("Flush writes to database");
|
||||
|
||||
flushJpaSession();
|
||||
|
||||
theTransactionStopWatch.endCurrentTask();
|
||||
if (conditionalRequestUrls.size() > 0) {
|
||||
theTransactionStopWatch.startTask("Check for conflicts in conditional resources");
|
||||
}
|
||||
|
||||
/*
|
||||
* Double check we didn't allow any duplicates we shouldn't have
|
||||
*/
|
||||
for (Map.Entry<String, Class<? extends IBaseResource>> nextEntry : conditionalRequestUrls.entrySet()) {
|
||||
String matchUrl = nextEntry.getKey();
|
||||
Class<? extends IBaseResource> resType = nextEntry.getValue();
|
||||
if (isNotBlank(matchUrl)) {
|
||||
IFhirResourceDao<?> resourceDao = myDao.getDao(resType);
|
||||
Set<Long> val = resourceDao.processMatchUrl(matchUrl);
|
||||
if (val.size() > 1) {
|
||||
throw new InvalidRequestException(
|
||||
"Unable to process " + theActionName + " - Request would cause multiple resources to match URL: \"" + matchUrl + "\". Does transaction request contain duplicates?");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
theTransactionStopWatch.endCurrentTask();
|
||||
|
||||
for (IIdType next : theAllIds) {
|
||||
IIdType replacement = theIdSubstitutions.get(next);
|
||||
if (replacement == null) {
|
||||
continue;
|
||||
}
|
||||
if (replacement.equals(next)) {
|
||||
continue;
|
||||
}
|
||||
ourLog.debug("Placeholder resource ID \"{}\" was replaced with permanent ID \"{}\"", next, replacement);
|
||||
}
|
||||
return entriesToProcess;
|
||||
}
|
||||
|
||||
private IIdType newIdType(String theResourceType, String theResourceId, String theVersion) {
|
||||
|
|
|
@ -56,8 +56,10 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
|
|||
* DELETE FROM foo WHERE params IN (aaaa)
|
||||
* type query and this can fail if we have 1000s of params
|
||||
*/
|
||||
public static int ourMaximumResultsToDeleteInOneStatement = 500;
|
||||
public static int ourMaximumResultsToDeleteInOnePass = 20000;
|
||||
public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT = 500;
|
||||
public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS = 20000;
|
||||
private static int ourMaximumResultsToDeleteInOneStatement = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT;
|
||||
private static int ourMaximumResultsToDeleteInOnePass = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS;
|
||||
private static Long ourNowForUnitTests;
|
||||
/*
|
||||
* We give a bit of extra leeway just to avoid race conditions where a query result
|
||||
|
@ -166,6 +168,11 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
|
|||
myCutoffSlack = theCutoffSlack;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static void setMaximumResultsToDeleteInOnePassForUnitTest(int theMaximumResultsToDeleteInOnePass) {
|
||||
ourMaximumResultsToDeleteInOnePass = theMaximumResultsToDeleteInOnePass;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static void setMaximumResultsToDeleteForUnitTest(int theMaximumResultsToDelete) {
|
||||
ourMaximumResultsToDeleteInOneStatement = theMaximumResultsToDelete;
|
||||
|
|
|
@ -7,10 +7,10 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition;
|
|||
import ca.uhn.fhir.jpa.config.BaseConfig;
|
||||
import ca.uhn.fhir.jpa.dao.DaoRegistry;
|
||||
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.dao.MatchUrlService;
|
||||
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
|
||||
import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl;
|
||||
import ca.uhn.fhir.jpa.dao.MatchUrlService;
|
||||
import ca.uhn.fhir.jpa.subscription.matcher.SubscriptionMatcherCompositeInMemoryDatabase;
|
||||
import ca.uhn.fhir.jpa.subscription.matcher.SubscriptionMatcherDatabase;
|
||||
import ca.uhn.fhir.jpa.util.JpaConstants;
|
||||
|
@ -84,6 +84,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
|
|||
static final String SUBSCRIPTION_STATUS = "Subscription.status";
|
||||
static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
|
||||
private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000;
|
||||
private static boolean ourForcePayloadEncodeAndDecodeForUnitTests;
|
||||
private final Object myInitSubscriptionsLock = new Object();
|
||||
private SubscribableChannel myProcessingChannel;
|
||||
private Map<String, SubscribableChannel> myDeliveryChannel;
|
||||
|
@ -97,7 +98,6 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
|
|||
private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
|
||||
private ThreadPoolExecutor myDeliveryExecutor;
|
||||
private LinkedBlockingQueue<Runnable> myProcessingExecutorQueue;
|
||||
|
||||
@Autowired
|
||||
private FhirContext myCtx;
|
||||
@Autowired(required = false)
|
||||
|
@ -328,7 +328,6 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
|
|||
myProcessingChannel = theProcessingChannel;
|
||||
}
|
||||
|
||||
|
||||
public List<CanonicalSubscription> getRegisteredSubscriptions() {
|
||||
return new ArrayList<>(myIdToSubscription.values());
|
||||
}
|
||||
|
@ -434,11 +433,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
|
|||
|
||||
@Override
|
||||
public void resourceCreated(RequestDetails theRequest, IBaseResource theResource) {
|
||||
ResourceModifiedMessage msg = new ResourceModifiedMessage();
|
||||
msg.setId(theResource.getIdElement());
|
||||
msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.CREATE);
|
||||
msg.setNewPayload(myCtx, theResource);
|
||||
submitResourceModified(msg);
|
||||
submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -455,10 +450,17 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
|
|||
}
|
||||
|
||||
void submitResourceModifiedForUpdate(IBaseResource theNewResource) {
|
||||
submitResourceModified(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE);
|
||||
}
|
||||
|
||||
private void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType) {
|
||||
ResourceModifiedMessage msg = new ResourceModifiedMessage();
|
||||
msg.setId(theNewResource.getIdElement());
|
||||
msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE);
|
||||
msg.setOperationType(theOperationType);
|
||||
msg.setNewPayload(myCtx, theNewResource);
|
||||
if (ourForcePayloadEncodeAndDecodeForUnitTests) {
|
||||
msg.clearPayloadDecoded();
|
||||
}
|
||||
submitResourceModified(msg);
|
||||
}
|
||||
|
||||
|
@ -491,7 +493,6 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
|
|||
myCtx = theCtx;
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
public void setTxManager(PlatformTransactionManager theTxManager) {
|
||||
myTxManager = theTxManager;
|
||||
|
@ -598,7 +599,6 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
|
|||
return myIdToSubscription.remove(subscriptionId);
|
||||
}
|
||||
|
||||
|
||||
public IFhirResourceDao<?> getSubscriptionDao() {
|
||||
return myDaoRegistry.getResourceDao("Subscription");
|
||||
}
|
||||
|
@ -618,7 +618,12 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
|
|||
RuntimeResourceDefinition resourceDef = CacheWarmingSvcImpl.parseUrlResourceType(myCtx, criteria);
|
||||
myMatchUrlService.translateMatchUrl(criteria, resourceDef);
|
||||
} catch (InvalidRequestException e) {
|
||||
throw new UnprocessableEntityException("Invalid subscription criteria submitted: "+criteria+" "+e.getMessage());
|
||||
throw new UnprocessableEntityException("Invalid subscription criteria submitted: " + criteria + " " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static void setForcePayloadEncodeAndDecodeForUnitTests(boolean theForcePayloadEncodeAndDecodeForUnitTests) {
|
||||
ourForcePayloadEncodeAndDecodeForUnitTests = theForcePayloadEncodeAndDecodeForUnitTests;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
|
||||
public class ResourceModifiedMessage {
|
||||
|
@ -44,10 +46,16 @@ public class ResourceModifiedMessage {
|
|||
*/
|
||||
@JsonProperty(value = "subscriptionId", required = false)
|
||||
private String mySubscriptionId;
|
||||
@JsonProperty("newPayload")
|
||||
private String myNewPayloadEncoded;
|
||||
@JsonProperty("payload")
|
||||
private String myPayload;
|
||||
@JsonProperty("payloadId")
|
||||
private String myPayloadId;
|
||||
@JsonIgnore
|
||||
private transient IBaseResource myNewPayload;
|
||||
private transient IBaseResource myPayloadDecoded;
|
||||
|
||||
public String getPayloadId() {
|
||||
return myPayloadId;
|
||||
}
|
||||
|
||||
public String getSubscriptionId() {
|
||||
return mySubscriptionId;
|
||||
|
@ -66,10 +74,10 @@ public class ResourceModifiedMessage {
|
|||
}
|
||||
|
||||
public IBaseResource getNewPayload(FhirContext theCtx) {
|
||||
if (myNewPayload == null && myNewPayloadEncoded != null) {
|
||||
myNewPayload = theCtx.newJsonParser().parseResource(myNewPayloadEncoded);
|
||||
if (myPayloadDecoded == null && isNotBlank(myPayload)) {
|
||||
myPayloadDecoded = theCtx.newJsonParser().parseResource(myPayload);
|
||||
}
|
||||
return myNewPayload;
|
||||
return myPayloadDecoded;
|
||||
}
|
||||
|
||||
public OperationTypeEnum getOperationType() {
|
||||
|
@ -88,8 +96,19 @@ public class ResourceModifiedMessage {
|
|||
}
|
||||
|
||||
public void setNewPayload(FhirContext theCtx, IBaseResource theNewPayload) {
|
||||
myNewPayload = theNewPayload;
|
||||
myNewPayloadEncoded = theCtx.newJsonParser().encodeResourceToString(theNewPayload);
|
||||
myPayload = theCtx.newJsonParser().encodeResourceToString(theNewPayload);
|
||||
myPayloadId = theNewPayload.getIdElement().toUnqualified().getValue();
|
||||
myPayloadDecoded = theNewPayload;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is mostly useful for unit tests - Clear the decoded payload so that
|
||||
* we force the encoded version to be used later. This proves that we get the same
|
||||
* behaviour in environments with serializing queues as we do with in-memory
|
||||
* queues.
|
||||
*/
|
||||
public void clearPayloadDecoded() {
|
||||
myPayloadDecoded = null;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import ca.uhn.fhir.jpa.dao.index.ResourceIndexedSearchParams;
|
|||
import ca.uhn.fhir.jpa.dao.index.SearchParamExtractorService;
|
||||
import ca.uhn.fhir.jpa.entity.ResourceTable;
|
||||
import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessage;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
|
||||
|
@ -45,7 +46,11 @@ public class SubscriptionMatcherInMemory implements ISubscriptionMatcher {
|
|||
|
||||
@Override
|
||||
public SubscriptionMatchResult match(String criteria, ResourceModifiedMessage msg) {
|
||||
return match(criteria, msg.getNewPayload(myContext));
|
||||
try {
|
||||
return match(criteria, msg.getNewPayload(myContext));
|
||||
} catch (Exception e) {
|
||||
throw new InternalErrorException("Failure processing resource ID[" + msg.getId(myContext) + "] for subscription ID[" + msg.getSubscriptionId() + "]: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
SubscriptionMatchResult match(String criteria, IBaseResource resource) {
|
||||
|
|
|
@ -36,10 +36,7 @@ import java.io.InputStream;
|
|||
import java.io.UnsupportedEncodingException;
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
|
@ -815,6 +812,81 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
|
|||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testTransactionUpdatingManuallyDeletedResource() {
|
||||
|
||||
// Create an observation
|
||||
Observation obs = new Observation();
|
||||
obs.addIdentifier().setSystem("urn:system").setValue("foo");
|
||||
IIdType obId = myObservationDao.create(obs).getId();
|
||||
|
||||
// Manually mark it a deleted
|
||||
runInTransaction(()->{
|
||||
myEntityManager.createNativeQuery("UPDATE HFJ_RESOURCE SET RES_DELETED_AT = CURRENT_TIMESTAMP").executeUpdate();
|
||||
});
|
||||
|
||||
runInTransaction(()->{
|
||||
ResourceTable obsTable = myResourceTableDao.findById(obId.getIdPartAsLong()).get();
|
||||
assertNotNull(obsTable.getDeleted());
|
||||
assertEquals(1L, obsTable.getVersion());
|
||||
});
|
||||
|
||||
// Now create a transaction
|
||||
|
||||
obs = new Observation();
|
||||
obs.setId(IdType.newRandomUuid());
|
||||
obs.addIdentifier().setSystem("urn:system").setValue("foo");
|
||||
|
||||
DiagnosticReport dr = new DiagnosticReport();
|
||||
dr.setId(IdType.newRandomUuid());
|
||||
dr.addIdentifier().setSystem("urn:system").setValue("bar");
|
||||
dr.addResult().setReference(obs.getId());
|
||||
|
||||
Bundle bundle = new Bundle();
|
||||
bundle.setType(BundleType.TRANSACTION);
|
||||
bundle.addEntry()
|
||||
.setResource(obs)
|
||||
.setFullUrl(obs.getId())
|
||||
.getRequest()
|
||||
.setMethod(HTTPVerb.PUT)
|
||||
.setUrl("Observation?identifier=urn:system|foo");
|
||||
bundle.addEntry()
|
||||
.setResource(dr)
|
||||
.setFullUrl(dr.getId())
|
||||
.getRequest()
|
||||
.setMethod(HTTPVerb.PUT)
|
||||
.setUrl("DiagnosticReport?identifier=urn:system|bar");
|
||||
|
||||
Bundle resp = mySystemDao.transaction(mySrd, bundle);
|
||||
assertEquals(2, resp.getEntry().size());
|
||||
|
||||
BundleEntryComponent respEntry = resp.getEntry().get(0);
|
||||
assertEquals(Constants.STATUS_HTTP_200_OK + " OK", respEntry.getResponse().getStatus());
|
||||
assertThat(respEntry.getResponse().getLocation(), containsString("Observation/" + obId.getIdPart()));
|
||||
assertThat(respEntry.getResponse().getLocation(), endsWith("/_history/3"));
|
||||
assertEquals("3", respEntry.getResponse().getEtag());
|
||||
|
||||
respEntry = resp.getEntry().get(1);
|
||||
assertEquals(Constants.STATUS_HTTP_201_CREATED + " Created", respEntry.getResponse().getStatus());
|
||||
assertThat(respEntry.getResponse().getLocation(), containsString("DiagnosticReport/"));
|
||||
assertThat(respEntry.getResponse().getLocation(), endsWith("/_history/1"));
|
||||
IdType drId = new IdType(respEntry.getResponse().getLocation());
|
||||
assertEquals("1", respEntry.getResponse().getEtag());
|
||||
|
||||
runInTransaction(()->{
|
||||
ResourceTable obsTable = myResourceTableDao.findById(obId.getIdPartAsLong()).get();
|
||||
assertNull(obsTable.getDeleted());
|
||||
assertEquals(3L, obsTable.getVersion());
|
||||
});
|
||||
|
||||
runInTransaction(()->{
|
||||
DiagnosticReport savedDr = myDiagnosticReportDao.read(drId);
|
||||
assertEquals(obId.toUnqualifiedVersionless().getValue(), savedDr.getResult().get(0).getReference());
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransactionCreateInlineMatchUrlWithOneMatchLastUpdated() {
|
||||
Bundle request = new Bundle();
|
||||
|
|
|
@ -32,7 +32,8 @@ public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
|
|||
super.after();
|
||||
StaleSearchDeletingSvcImpl staleSearchDeletingSvc = AopTestUtils.getTargetObject(myStaleSearchDeletingSvc);
|
||||
staleSearchDeletingSvc.setCutoffSlackForUnitTest(StaleSearchDeletingSvcImpl.DEFAULT_CUTOFF_SLACK);
|
||||
StaleSearchDeletingSvcImpl.setMaximumResultsToDeleteForUnitTest(10000);
|
||||
StaleSearchDeletingSvcImpl.setMaximumResultsToDeleteForUnitTest(StaleSearchDeletingSvcImpl.DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT);
|
||||
StaleSearchDeletingSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(StaleSearchDeletingSvcImpl.DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,6 +95,7 @@ public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
|
|||
@Test
|
||||
public void testDeleteVeryLargeSearch() {
|
||||
StaleSearchDeletingSvcImpl.setMaximumResultsToDeleteForUnitTest(10);
|
||||
StaleSearchDeletingSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(10);
|
||||
|
||||
runInTransaction(() -> {
|
||||
Search search = new Search();
|
||||
|
|
|
@ -4,9 +4,11 @@ import ca.uhn.fhir.context.FhirContext;
|
|||
import ca.uhn.fhir.jpa.config.TestR4Config;
|
||||
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.entity.ResourceIndexedSearchParamString;
|
||||
import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessage;
|
||||
import ca.uhn.fhir.model.api.TemporalPrecisionEnum;
|
||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||
import ca.uhn.fhir.rest.param.*;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.hl7.fhir.instance.model.api.IAnyResource;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
|
@ -23,8 +25,7 @@ import java.util.ArrayList;
|
|||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration(classes = {TestR4Config.class})
|
||||
|
@ -366,6 +367,31 @@ public class SubscriptionMatcherInMemoryTestR4 {
|
|||
assertNotMatched(obs02, params);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSearchReferenceInvalid() {
|
||||
Patient patient = new Patient();
|
||||
patient.setId("Patient/123");
|
||||
patient.addName().setFamily("FOO");
|
||||
patient.getManagingOrganization().setReference("urn:uuid:13720262-b392-465f-913e-54fb198ff954");
|
||||
|
||||
SearchParameterMap params;
|
||||
|
||||
params = new SearchParameterMap();
|
||||
params.add(Patient.SP_FAMILY, new StringParam("testSearchNameParam01Fam"));
|
||||
try {
|
||||
String criteria = params.toNormalizedQueryString(myContext);
|
||||
ResourceModifiedMessage msg = new ResourceModifiedMessage();
|
||||
msg.setSubscriptionId("Subscription/123");
|
||||
msg.setId(new IdType("Patient/ABC"));
|
||||
msg.setNewPayload(myContext, patient);
|
||||
SubscriptionMatchResult result = mySubscriptionMatcherInMemory.match(criteria, msg);
|
||||
fail();
|
||||
} catch (InternalErrorException e){
|
||||
assertEquals("Failure processing resource ID[Patient/ABC] for subscription ID[Subscription/123]: Invalid resource reference found at path[Patient.managingOrganization] - Does not contain resource type - urn:uuid:13720262-b392-465f-913e-54fb198ff954", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSearchResourceReferenceOnlyCorrectPath() {
|
||||
Organization org = new Organization();
|
||||
|
|
|
@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.subscription.r4;
|
|||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
|
||||
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
|
||||
import ca.uhn.fhir.jpa.subscription.RestHookTestDstu2Test;
|
||||
import ca.uhn.fhir.jpa.util.JpaConstants;
|
||||
import ca.uhn.fhir.rest.annotation.Create;
|
||||
|
@ -74,17 +75,10 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
|
|||
myDaoConfig.setEnableInMemorySubscriptionMatching(true);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void reportTotalSelects() {
|
||||
ourLog.info("Total database select queries: {}", getQueryCount().getSelect());
|
||||
}
|
||||
|
||||
private static QueryCount getQueryCount() {
|
||||
return ourCountHolder.getQueryCountMap().get("");
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterUnregisterRestHookListener() {
|
||||
BaseSubscriptionInterceptor.setForcePayloadEncodeAndDecodeForUnitTests(false);
|
||||
|
||||
for (IIdType next : mySubscriptionIds) {
|
||||
IIdType nextId = next.toUnqualifiedVersionless();
|
||||
ourLog.info("Deleting: {}", nextId);
|
||||
|
@ -422,6 +416,59 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
|
|||
assertFalse(observation2.getId().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubscriptionTriggerViaSubscription() throws Exception {
|
||||
BaseSubscriptionInterceptor.setForcePayloadEncodeAndDecodeForUnitTests(true);
|
||||
|
||||
String payload = "application/xml";
|
||||
|
||||
String code = "1000000050";
|
||||
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
|
||||
|
||||
createSubscription(criteria1, payload, ourListenerServerBase);
|
||||
waitForRegisteredSubscriptionCount(1);
|
||||
|
||||
ourLog.info("** About to send obervation");
|
||||
|
||||
Observation observation = new Observation();
|
||||
observation.addIdentifier().setSystem("foo").setValue("bar1");
|
||||
observation.setId(IdType.newRandomUuid().getValue());
|
||||
CodeableConcept codeableConcept = new CodeableConcept()
|
||||
.addCoding(new Coding().setCode(code).setSystem("SNOMED-CT"));
|
||||
observation.setCode(codeableConcept);
|
||||
observation.setStatus(Observation.ObservationStatus.FINAL);
|
||||
|
||||
Patient patient = new Patient();
|
||||
patient.addIdentifier().setSystem("foo").setValue("bar2");
|
||||
patient.setId(IdType.newRandomUuid().getValue());
|
||||
patient.setActive(true);
|
||||
observation.getSubject().setReference(patient.getId());
|
||||
|
||||
Bundle requestBundle = new Bundle();
|
||||
requestBundle.setType(Bundle.BundleType.TRANSACTION);
|
||||
requestBundle.addEntry()
|
||||
.setResource(observation)
|
||||
.setFullUrl(observation.getId())
|
||||
.getRequest()
|
||||
.setUrl("Obervation?identifier=foo|bar1")
|
||||
.setMethod(Bundle.HTTPVerb.PUT);
|
||||
requestBundle.addEntry()
|
||||
.setResource(patient)
|
||||
.setFullUrl(patient.getId())
|
||||
.getRequest()
|
||||
.setUrl("Patient?identifier=foo|bar2")
|
||||
.setMethod(Bundle.HTTPVerb.PUT);
|
||||
ourClient.transaction().withBundle(requestBundle).execute();
|
||||
|
||||
// Should see 1 subscription notification
|
||||
waitForSize(0, ourCreatedObservations);
|
||||
waitForSize(1, ourUpdatedObservations);
|
||||
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
|
||||
|
||||
Observation obs = ourUpdatedObservations.get(0);
|
||||
ourLog.info("Observation content: {}", myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(obs));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateSubscriptionToMatchLater() throws Exception {
|
||||
String payload = "application/xml";
|
||||
|
@ -568,7 +615,7 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
|
|||
RestHookTestDstu2Test.waitForQueueToDrain(getRestHookSubscriptionInterceptor());
|
||||
}
|
||||
|
||||
@Test(expected= UnprocessableEntityException.class)
|
||||
@Test(expected = UnprocessableEntityException.class)
|
||||
public void testInvalidProvenanceParam() {
|
||||
String payload = "application/fhir+json";
|
||||
String criteriabad = "Provenance?activity=http://hl7.org/fhir/v3/DocumentCompletion%7CAU";
|
||||
|
@ -576,7 +623,7 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
|
|||
ourClient.create().resource(subscription).execute();
|
||||
}
|
||||
|
||||
@Test(expected= UnprocessableEntityException.class)
|
||||
@Test(expected = UnprocessableEntityException.class)
|
||||
public void testInvalidProcedureRequestParam() {
|
||||
String payload = "application/fhir+json";
|
||||
String criteriabad = "ProcedureRequest?intent=instance-order&category=Laboratory";
|
||||
|
@ -584,7 +631,7 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
|
|||
ourClient.create().resource(subscription).execute();
|
||||
}
|
||||
|
||||
@Test(expected= UnprocessableEntityException.class)
|
||||
@Test(expected = UnprocessableEntityException.class)
|
||||
public void testInvalidBodySiteParam() {
|
||||
String payload = "application/fhir+json";
|
||||
String criteriabad = "BodySite?accessType=Catheter";
|
||||
|
@ -631,6 +678,15 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
|
|||
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void reportTotalSelects() {
|
||||
ourLog.info("Total database select queries: {}", getQueryCount().getSelect());
|
||||
}
|
||||
|
||||
private static QueryCount getQueryCount() {
|
||||
return ourCountHolder.getQueryCountMap().get("");
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void startListenerServer() throws Exception {
|
||||
ourListenerPort = PortUtil.findFreePort();
|
||||
|
|
|
@ -20,7 +20,6 @@ import java.io.Reader;
|
|||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.*;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||
|
@ -55,7 +54,7 @@ public abstract class RequestDetails {
|
|||
private String myOperation;
|
||||
private Map<String, String[]> myParameters;
|
||||
private byte[] myRequestContents;
|
||||
private IRequestOperationCallback myRequestOperationCallback = new RequestOperationCallback();
|
||||
private IRequestOperationCallback myRequestOperationCallback;
|
||||
private String myRequestPath;
|
||||
private RequestTypeEnum myRequestType;
|
||||
private String myResourceName;
|
||||
|
@ -67,6 +66,13 @@ public abstract class RequestDetails {
|
|||
private Map<String, List<String>> myUnqualifiedToQualifiedNames;
|
||||
private Map<Object, Object> myUserData;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public RequestDetails() {
|
||||
myRequestOperationCallback = new RequestOperationCallback();
|
||||
}
|
||||
|
||||
public void addParameter(String theName, String[] theValues) {
|
||||
getParameters();
|
||||
myParameters.put(theName, theValues);
|
||||
|
@ -406,6 +412,94 @@ public abstract class RequestDetails {
|
|||
myRequestContents = theRequestContents;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the {@link #getRequestOperationCallback() requestOperationCallback} handler in
|
||||
* deferred mode, meaning that any notifications will be queued up for delivery, but
|
||||
* won't be delivered until {@link #stopDeferredRequestOperationCallbackAndRunDeferredItems()}
|
||||
* is called.
|
||||
*/
|
||||
public void startDeferredOperationCallback() {
|
||||
myRequestOperationCallback = new DeferredOperationCallback(myRequestOperationCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see #startDeferredOperationCallback()
|
||||
*/
|
||||
public void stopDeferredRequestOperationCallbackAndRunDeferredItems() {
|
||||
DeferredOperationCallback deferredCallback = (DeferredOperationCallback) myRequestOperationCallback;
|
||||
deferredCallback.playDeferredActions();
|
||||
myRequestOperationCallback = deferredCallback.getWrap();
|
||||
}
|
||||
|
||||
|
||||
private class DeferredOperationCallback implements IRequestOperationCallback {
|
||||
|
||||
private final IRequestOperationCallback myWrap;
|
||||
private final List<Runnable> myDeferredTasks = new ArrayList<>();
|
||||
|
||||
private DeferredOperationCallback(IRequestOperationCallback theWrap) {
|
||||
myWrap = theWrap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourceCreated(IBaseResource theResource) {
|
||||
myDeferredTasks.add(()-> myWrap.resourceCreated(theResource));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourceDeleted(IBaseResource theResource) {
|
||||
myDeferredTasks.add(()-> myWrap.resourceDeleted(theResource));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourcePreCreate(IBaseResource theResource) {
|
||||
myWrap.resourcePreCreate(theResource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourcePreDelete(IBaseResource theResource) {
|
||||
myWrap.resourcePreDelete(theResource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourcePreUpdate(IBaseResource theOldResource, IBaseResource theNewResource) {
|
||||
myWrap.resourcePreUpdate(theOldResource, theNewResource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourceUpdated(IBaseResource theResource) {
|
||||
myDeferredTasks.add(()-> myWrap.resourceUpdated(theResource));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourceUpdated(IBaseResource theOldResource, IBaseResource theNewResource) {
|
||||
myDeferredTasks.add(()-> myWrap.resourceUpdated(theOldResource, theNewResource));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourcesCreated(Collection<? extends IBaseResource> theResource) {
|
||||
myDeferredTasks.add(()-> myWrap.resourcesCreated(theResource));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourcesDeleted(Collection<? extends IBaseResource> theResource) {
|
||||
myDeferredTasks.add(()-> myWrap.resourcesDeleted(theResource));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourcesUpdated(Collection<? extends IBaseResource> theResource) {
|
||||
myDeferredTasks.add(()-> myWrap.resourcesUpdated(theResource));
|
||||
}
|
||||
|
||||
void playDeferredActions() {
|
||||
myDeferredTasks.forEach(Runnable::run);
|
||||
}
|
||||
|
||||
IRequestOperationCallback getWrap() {
|
||||
return myWrap;
|
||||
}
|
||||
}
|
||||
|
||||
private class RequestOperationCallback implements IRequestOperationCallback {
|
||||
|
||||
private List<IServerInterceptor> getInterceptors() {
|
||||
|
@ -499,6 +593,7 @@ public abstract class RequestDetails {
|
|||
/**
|
||||
* @deprecated Deprecated in HAPI FHIR 2.6 - Use {@link IRequestOperationCallback#resourceUpdated(IBaseResource, IBaseResource)} instead
|
||||
*/
|
||||
@Override
|
||||
@Deprecated
|
||||
public void resourcesUpdated(Collection<? extends IBaseResource> theResource) {
|
||||
for (IBaseResource next : theResource) {
|
||||
|
|
Loading…
Reference in New Issue