Start refactoring of basetransactionprocessor

This commit is contained in:
Tadgh 2021-09-11 15:13:05 -04:00
parent b224463d35
commit 94f157a4ac
3 changed files with 121 additions and 60 deletions

View File

@ -815,6 +815,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>9</source>
<target>9</target>
</configuration>
</plugin>
</plugins>
<resources>
<resource>

View File

@ -93,6 +93,7 @@ import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -481,7 +482,7 @@ public abstract class BaseTransactionProcessor {
entries.sort(new TransactionSorter(placeholderIds));
// perform all writes
doTransactionWriteOperations(theRequestDetails, theActionName,
prepareThenExecuteTransactionWriteOperations(theRequestDetails, theActionName,
transactionDetails, transactionStopWatch,
response, originalRequestOrder, entries);
@ -584,38 +585,15 @@ public abstract class BaseTransactionProcessor {
* heavy load with lots of concurrent transactions using all available
* database connections.
*/
private void doTransactionWriteOperations(RequestDetails theRequestDetails, String theActionName,
private void prepareThenExecuteTransactionWriteOperations(RequestDetails theRequestDetails, String theActionName,
TransactionDetails theTransactionDetails, StopWatch theTransactionStopWatch,
IBaseBundle theResponse, IdentityHashMap<IBase, Integer> theOriginalRequestOrder,
List<IBase> theEntries) {
TransactionWriteOperationsDetails writeOperationsDetails = null;
if (CompositeInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE, myInterceptorBroadcaster, theRequestDetails) ||
CompositeInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_POST, myInterceptorBroadcaster, theRequestDetails)) {
List<String> updateRequestUrls = new ArrayList<>();
List<String> conditionalCreateRequestUrls = new ArrayList<>();
for (IBase nextEntry : theEntries) {
String method = myVersionAdapter.getEntryRequestVerb(myContext, nextEntry);
if ("PUT".equals(method)) {
String requestUrl = myVersionAdapter.getEntryRequestUrl(nextEntry);
if (isNotBlank(requestUrl)) {
updateRequestUrls.add(requestUrl);
}
} else if ("POST".equals(method)) {
String requestUrl = myVersionAdapter.getEntryRequestIfNoneExist(nextEntry);
if (isNotBlank(requestUrl) && requestUrl.contains("?")) {
conditionalCreateRequestUrls.add(requestUrl);
}
}
}
writeOperationsDetails = new TransactionWriteOperationsDetails();
writeOperationsDetails.setUpdateRequestUrls(updateRequestUrls);
writeOperationsDetails.setConditionalCreateRequestUrls(conditionalCreateRequestUrls);
HookParams params = new HookParams()
.add(TransactionDetails.class, theTransactionDetails)
.add(TransactionWriteOperationsDetails.class, writeOperationsDetails);
CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE, params);
if (haveWriteOperationsHooks(theRequestDetails)) {
writeOperationsDetails = buildWriteOperationsDetails(theEntries);
callWriteOperationsHook(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE, theRequestDetails, theTransactionDetails, writeOperationsDetails);
}
TransactionCallback<Map<IBase, IIdType>> txCallback = status -> {
@ -636,13 +614,9 @@ public abstract class BaseTransactionProcessor {
try {
entriesToProcess = myHapiTransactionService.execute(theRequestDetails, theTransactionDetails, txCallback);
}
finally {
if (writeOperationsDetails != null) {
HookParams params = new HookParams()
.add(TransactionDetails.class, theTransactionDetails)
.add(TransactionWriteOperationsDetails.class, writeOperationsDetails);
CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_POST, params);
} finally {
if (haveWriteOperationsHooks(theRequestDetails)) {
callWriteOperationsHook(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_POST, theRequestDetails, theTransactionDetails, writeOperationsDetails);
}
}
@ -656,6 +630,45 @@ public abstract class BaseTransactionProcessor {
}
}
private boolean haveWriteOperationsHooks(RequestDetails theRequestDetails) {
return CompositeInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE, myInterceptorBroadcaster, theRequestDetails) ||
CompositeInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_POST, myInterceptorBroadcaster, theRequestDetails);
}
private void callWriteOperationsHook(Pointcut thePointcut, RequestDetails theRequestDetails, TransactionDetails theTransactionDetails, TransactionWriteOperationsDetails theWriteOperationsDetails) {
HookParams params = new HookParams()
.add(TransactionDetails.class, theTransactionDetails)
.add(TransactionWriteOperationsDetails.class, theWriteOperationsDetails);
CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, thePointcut, params);
}
@NotNull
private TransactionWriteOperationsDetails buildWriteOperationsDetails(List<IBase> theEntries) {
TransactionWriteOperationsDetails writeOperationsDetails;
List<String> updateRequestUrls = new ArrayList<>();
List<String> conditionalCreateRequestUrls = new ArrayList<>();
//Extract
for (IBase nextEntry : theEntries) {
String method = myVersionAdapter.getEntryRequestVerb(myContext, nextEntry);
if ("PUT".equals(method)) {
String requestUrl = myVersionAdapter.getEntryRequestUrl(nextEntry);
if (isNotBlank(requestUrl)) {
updateRequestUrls.add(requestUrl);
}
} else if ("POST".equals(method)) {
String requestUrl = myVersionAdapter.getEntryRequestIfNoneExist(nextEntry);
if (isNotBlank(requestUrl) && requestUrl.contains("?")) {
conditionalCreateRequestUrls.add(requestUrl);
}
}
}
writeOperationsDetails = new TransactionWriteOperationsDetails();
writeOperationsDetails.setUpdateRequestUrls(updateRequestUrls);
writeOperationsDetails.setConditionalCreateRequestUrls(conditionalCreateRequestUrls);
return writeOperationsDetails;
}
private boolean isValidVerb(String theVerb) {
try {
return org.hl7.fhir.r4.model.Bundle.HTTPVerb.fromCode(theVerb) != null;
@ -704,7 +717,7 @@ public abstract class BaseTransactionProcessor {
IBaseResource resource = myVersionAdapter.getResource(nextReqEntry);
if (resource != null) {
String verb = myVersionAdapter.getEntryRequestVerb(myContext, nextReqEntry);
String entryUrl = myVersionAdapter.getFullUrl(nextReqEntry);
String entryFullUrl = myVersionAdapter.getFullUrl(nextReqEntry);
String requestUrl = myVersionAdapter.getEntryRequestUrl(nextReqEntry);
String ifNoneExist = myVersionAdapter.getEntryRequestIfNoneExist(nextReqEntry);
String key = verb + "|" + requestUrl + "|" + ifNoneExist;
@ -712,7 +725,7 @@ public abstract class BaseTransactionProcessor {
// Conditional UPDATE
boolean consolidateEntry = false;
if ("PUT".equals(verb)) {
if (isNotBlank(entryUrl) && isNotBlank(requestUrl)) {
if (isNotBlank(entryFullUrl) && isNotBlank(requestUrl)) {
int questionMarkIndex = requestUrl.indexOf('?');
if (questionMarkIndex >= 0 && requestUrl.length() > (questionMarkIndex + 1)) {
consolidateEntry = true;
@ -722,8 +735,8 @@ public abstract class BaseTransactionProcessor {
// Conditional CREATE
if ("POST".equals(verb)) {
if (isNotBlank(entryUrl) && isNotBlank(requestUrl) && isNotBlank(ifNoneExist)) {
if (!entryUrl.equals(requestUrl)) {
if (isNotBlank(entryFullUrl) && isNotBlank(requestUrl) && isNotBlank(ifNoneExist)) {
if (!entryFullUrl.equals(requestUrl)) {
consolidateEntry = true;
}
}
@ -731,12 +744,24 @@ public abstract class BaseTransactionProcessor {
if (consolidateEntry) {
if (!keyToUuid.containsKey(key)) {
keyToUuid.put(key, entryUrl);
keyToUuid.put(key, entryFullUrl);
} else {
ourLog.info("Discarding transaction bundle entry {} as it contained a duplicate conditional {}", originalIndex, verb);
theEntries.remove(index);
index--;
String existingUuid = keyToUuid.get(key);
replaceReferencesInEntriesWithConsolidatedUUID(theEntries, entryFullUrl, existingUuid);
}
}
}
}
}
/**
* Iterates over all entries, and if it finds any which have references which match the fullUrl of the entry that was consolidated out
* replace them with our new consolidated UUID
*/
private void replaceReferencesInEntriesWithConsolidatedUUID(List<IBase> theEntries, String theEntryFullUrl, String existingUuid) {
for (IBase nextEntry : theEntries) {
IBaseResource nextResource = myVersionAdapter.getResource(nextEntry);
for (IBaseReference nextReference : myContext.newTerser().getAllPopulatedChildElementsOfType(nextResource, IBaseReference.class)) {
@ -746,17 +771,13 @@ public abstract class BaseTransactionProcessor {
if (isBlank(nextReferenceId) && nextReference.getResource() != null) {
nextReferenceId = nextReference.getResource().getIdElement().getValue();
}
if (entryUrl.equals(nextReferenceId)) {
if (theEntryFullUrl.equals(nextReferenceId)) {
nextReference.setReference(existingUuid);
nextReference.setResource(null);
}
}
}
}
}
}
}
}
/**
* Retrieves the next resource id (IIdType) from the base resource and next request entry.
@ -810,12 +831,16 @@ public abstract class BaseTransactionProcessor {
return nextResourceId;
}
/** After pre-hooks have been called
*
*/
protected Map<IBase, IIdType> doTransactionWriteOperations(final RequestDetails theRequest, String theActionName,
TransactionDetails theTransactionDetails, Set<IIdType> theAllIds,
Map<IIdType, IIdType> theIdSubstitutions, Map<IIdType, DaoMethodOutcome> theIdToPersistedOutcome,
IBaseBundle theResponse, IdentityHashMap<IBase, Integer> theOriginalRequestOrder,
List<IBase> theEntries, StopWatch theTransactionStopWatch) {
// During a transaction, we don't execute hooks, instead, we execute them all post-transaction.
theTransactionDetails.beginAcceptingDeferredInterceptorBroadcasts(
Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED,
Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED,

View File

@ -1206,6 +1206,34 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
assertThat(observationLocation, matchesPattern("Observation/[a-z0-9-]+/_history/1"));
}
@Test
public void testConditionalUrlWhichDoesNotMatcHResource() {
Bundle transactionBundle = new Bundle().setType(BundleType.TRANSACTION);
// Patient
HumanName patientName = new HumanName().setFamily("TEST_LAST_NAME").addGiven("TEST_FIRST_NAME");
Identifier patientIdentifier = new Identifier().setSystem("http://example.com/mrns").setValue("U1234567890");
Patient patient = new Patient()
.setName(List.of(patientName))
.setIdentifier(List.of(patientIdentifier));
patient.setId(IdType.newRandomUuid());
transactionBundle
.addEntry()
.setFullUrl(patient.getId())
.setResource(patient)
.getRequest()
.setMethod(Bundle.HTTPVerb.PUT)
.setUrl("/Patient?identifier=" + patientIdentifier.getSystem() + "|" + "zoop");
ourLog.info("Patient TEMP UUID: {}", patient.getId());
String s = myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(transactionBundle);
System.out.println(s);
Bundle outcome= mySystemDao.transaction(null, transactionBundle);
String patientLocation = outcome.getEntry().get(0).getResponse().getLocation();
assertThat(patientLocation, matchesPattern("Patient/[a-z0-9-]+/_history/1"));
}
@Test
public void testTransactionCreateInlineMatchUrlWithOneMatch() {
String methodName = "testTransactionCreateInlineMatchUrlWithOneMatch";