From a894779760bf93b073a84d63febf5830c55e6dee Mon Sep 17 00:00:00 2001 From: James Agnew Date: Fri, 9 Aug 2024 11:51:20 -0400 Subject: [PATCH] Partition aware transactions (#6167) * Partition aware transactions * Address review comments * Test fixes * Remove dead issue field * Test fixes --------- Co-authored-by: Tadgh --- .../interceptor/model/RequestPartitionId.java | 23 ++ .../ca/uhn/fhir/repository/Repository.java | 19 ++ .../ca/uhn/fhir/i18n/hapi-messages.properties | 1 + .../model/RequestPartitionIdTest.java | 44 ++++ ...e-transactions-better-partition-aware.yaml | 7 + .../fhir/jpa/dao/TransactionProcessor.java | 38 +--- .../model/ReadPartitionIdRequestDetails.java | 18 ++ .../jpa/dao/r4/BasePartitioningR4Test.java | 14 +- .../PartitionedStrictTransactionR4Test.java | 211 ++++++++++++++++++ .../jpa/dao/r4/PartitioningSqlR4Test.java | 4 +- .../stresstest/GiantTransactionPerfTest.java | 1 + .../jpa/dao/BaseTransactionProcessor.java | 100 ++++++++- .../jpa/dao/tx/HapiTransactionService.java | 20 +- .../jpa/dao/tx/IHapiTransactionService.java | 14 ++ 14 files changed, 470 insertions(+), 44 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_4_0/6163-make-transactions-better-partition-aware.yaml create mode 100644 hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitionedStrictTransactionR4Test.java diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/model/RequestPartitionId.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/model/RequestPartitionId.java index 283456d01f3..cfb538997ac 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/model/RequestPartitionId.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/model/RequestPartitionId.java @@ -40,6 +40,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; @@ -98,6 +99,28 @@ public class RequestPartitionId implements IModelJson { myAllPartitions = true; } + /** + * Creates a new RequestPartitionId which includes all partition IDs from + * this {@link RequestPartitionId} but also includes all IDs from the given + * {@link RequestPartitionId}. Any duplicates are only included once, and + * partition names and dates are ignored and not returned. This {@link RequestPartitionId} + * and {@literal theOther} are not modified. + * + * @since 7.4.0 + */ + public RequestPartitionId mergeIds(RequestPartitionId theOther) { + if (isAllPartitions() || theOther.isAllPartitions()) { + return RequestPartitionId.allPartitions(); + } + + List thisPartitionIds = getPartitionIds(); + List otherPartitionIds = theOther.getPartitionIds(); + List newPartitionIds = Stream.concat(thisPartitionIds.stream(), otherPartitionIds.stream()) + .distinct() + .collect(Collectors.toList()); + return RequestPartitionId.fromPartitionIds(newPartitionIds); + } + public static RequestPartitionId fromJson(String theJson) throws JsonProcessingException { return ourObjectMapper.readValue(theJson, RequestPartitionId.class); } diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/repository/Repository.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/repository/Repository.java index a95de2b450e..e859a9ae569 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/repository/Repository.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/repository/Repository.java @@ -1,3 +1,22 @@ +/*- + * #%L + * HAPI FHIR - Core Library + * %% + * Copyright (C) 2014 - 2024 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ package ca.uhn.fhir.repository; import ca.uhn.fhir.context.FhirContext; diff --git a/hapi-fhir-base/src/main/resources/ca/uhn/fhir/i18n/hapi-messages.properties b/hapi-fhir-base/src/main/resources/ca/uhn/fhir/i18n/hapi-messages.properties index 3da79d2065d..fbd3091918a 100644 --- a/hapi-fhir-base/src/main/resources/ca/uhn/fhir/i18n/hapi-messages.properties +++ b/hapi-fhir-base/src/main/resources/ca/uhn/fhir/i18n/hapi-messages.properties @@ -94,6 +94,7 @@ ca.uhn.fhir.jpa.dao.BaseStorageDao.inlineMatchNotSupported=Inline match URLs are ca.uhn.fhir.jpa.dao.BaseStorageDao.transactionOperationWithMultipleMatchFailure=Failed to {0} resource with match URL "{1}" because this search matched {2} resources ca.uhn.fhir.jpa.dao.BaseStorageDao.deleteByUrlThresholdExceeded=Failed to DELETE resources with match URL "{0}" because the resolved number of resources: {1} exceeds the threshold of {2} ca.uhn.fhir.jpa.dao.BaseStorageDao.transactionOperationWithIdNotMatchFailure=Failed to {0} resource with match URL "{1}" because the matching resource does not match the provided ID +ca.uhn.fhir.jpa.dao.BaseTransactionProcessor.multiplePartitionAccesses=Can not process transaction with {0} entries: Entries require access to multiple/conflicting partitions ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.transactionOperationFailedNoId=Failed to {0} resource in transaction because no ID was provided ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.transactionOperationFailedUnknownId=Failed to {0} resource in transaction because no resource could be found with ID {1} ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.uniqueIndexConflictFailure=Can not create resource of type {0} as it would create a duplicate unique index matching query: {1} (existing index belongs to {2}, new unique index created by {3}) diff --git a/hapi-fhir-base/src/test/java/ca/uhn/fhir/interceptor/model/RequestPartitionIdTest.java b/hapi-fhir-base/src/test/java/ca/uhn/fhir/interceptor/model/RequestPartitionIdTest.java index 51c837d4f38..8d6e934ba73 100644 --- a/hapi-fhir-base/src/test/java/ca/uhn/fhir/interceptor/model/RequestPartitionIdTest.java +++ b/hapi-fhir-base/src/test/java/ca/uhn/fhir/interceptor/model/RequestPartitionIdTest.java @@ -41,6 +41,50 @@ public class RequestPartitionIdTest { assertFalse(RequestPartitionId.forPartitionIdsAndNames(null, Lists.newArrayList(1, 2), null).isDefaultPartition()); } + @Test + public void testMergeIds() { + RequestPartitionId input0 = RequestPartitionId.fromPartitionIds(1, 2, 3); + RequestPartitionId input1 = RequestPartitionId.fromPartitionIds(1, 2, 4); + + RequestPartitionId actual = input0.mergeIds(input1); + RequestPartitionId expected = RequestPartitionId.fromPartitionIds(1, 2, 3, 4); + assertEquals(expected, actual); + + } + + @Test + public void testMergeIds_ThisAllPartitions() { + RequestPartitionId input0 = RequestPartitionId.allPartitions(); + RequestPartitionId input1 = RequestPartitionId.fromPartitionIds(1, 2, 4); + + RequestPartitionId actual = input0.mergeIds(input1); + RequestPartitionId expected = RequestPartitionId.allPartitions(); + assertEquals(expected, actual); + + } + + @Test + public void testMergeIds_OtherAllPartitions() { + RequestPartitionId input0 = RequestPartitionId.fromPartitionIds(1, 2, 3); + RequestPartitionId input1 = RequestPartitionId.allPartitions(); + + RequestPartitionId actual = input0.mergeIds(input1); + RequestPartitionId expected = RequestPartitionId.allPartitions(); + assertEquals(expected, actual); + + } + + @Test + public void testMergeIds_IncludesDefault() { + RequestPartitionId input0 = RequestPartitionId.fromPartitionIds(1, 2, 3); + RequestPartitionId input1 = RequestPartitionId.defaultPartition(); + + RequestPartitionId actual = input0.mergeIds(input1); + RequestPartitionId expected = RequestPartitionId.fromPartitionIds(1, 2, 3, null); + assertEquals(expected, actual); + + } + @Test public void testSerDeserSer() throws JsonProcessingException { { diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_4_0/6163-make-transactions-better-partition-aware.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_4_0/6163-make-transactions-better-partition-aware.yaml new file mode 100644 index 00000000000..2813c6391ae --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_4_0/6163-make-transactions-better-partition-aware.yaml @@ -0,0 +1,7 @@ +--- +type: fix +jira: SMILE-8652 +title: "When JPA servers are configured to always require a new database + transaction when switching partitions, the server will now correctly + identify the correct partition for FHIR transaction operations, and + fail the operation if multiple partitions would be required." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/TransactionProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/TransactionProcessor.java index 93d81a1f82c..86d9f727906 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/TransactionProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/TransactionProcessor.java @@ -27,7 +27,6 @@ import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; import ca.uhn.fhir.jpa.api.svc.IIdHelperService; import ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect; -import ca.uhn.fhir.jpa.model.config.PartitionSettings; import ca.uhn.fhir.jpa.model.dao.JpaPid; import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamToken; import ca.uhn.fhir.jpa.model.entity.StorageSettings; @@ -97,9 +96,6 @@ public class TransactionProcessor extends BaseTransactionProcessor { @Autowired private IIdHelperService myIdHelperService; - @Autowired - private PartitionSettings myPartitionSettings; - @Autowired private JpaStorageSettings myStorageSettings; @@ -150,14 +146,9 @@ public class TransactionProcessor extends BaseTransactionProcessor { List theEntries, StopWatch theTransactionStopWatch) { - ITransactionProcessorVersionAdapter versionAdapter = getVersionAdapter(); - RequestPartitionId requestPartitionId = null; - if (!myPartitionSettings.isPartitioningEnabled()) { - requestPartitionId = RequestPartitionId.allPartitions(); - } else { - // If all entries in the transaction point to the exact same partition, we'll try and do a pre-fetch - requestPartitionId = getSinglePartitionForAllEntriesOrNull(theRequest, theEntries, versionAdapter); - } + ITransactionProcessorVersionAdapter versionAdapter = getVersionAdapter(); + RequestPartitionId requestPartitionId = + super.determineRequestPartitionIdForWriteEntries(theRequest, theEntries); if (requestPartitionId != null) { preFetch(theTransactionDetails, theEntries, versionAdapter, requestPartitionId); @@ -472,24 +463,6 @@ public class TransactionProcessor extends BaseTransactionProcessor { } } - private RequestPartitionId getSinglePartitionForAllEntriesOrNull( - RequestDetails theRequest, List theEntries, ITransactionProcessorVersionAdapter versionAdapter) { - RequestPartitionId retVal = null; - Set requestPartitionIdsForAllEntries = new HashSet<>(); - for (IBase nextEntry : theEntries) { - IBaseResource resource = versionAdapter.getResource(nextEntry); - if (resource != null) { - RequestPartitionId requestPartition = myRequestPartitionSvc.determineCreatePartitionForRequest( - theRequest, resource, myFhirContext.getResourceType(resource)); - requestPartitionIdsForAllEntries.add(requestPartition); - } - } - if (requestPartitionIdsForAllEntries.size() == 1) { - retVal = requestPartitionIdsForAllEntries.iterator().next(); - } - return retVal; - } - /** * Given a token parameter, build the query predicate based on its hash. Uses system and value if both are available, otherwise just value. * If neither are available, it returns null. @@ -570,11 +543,6 @@ public class TransactionProcessor extends BaseTransactionProcessor { } } - @VisibleForTesting - public void setPartitionSettingsForUnitTest(PartitionSettings thePartitionSettings) { - myPartitionSettings = thePartitionSettings; - } - @VisibleForTesting public void setIdHelperServiceForUnitTest(IIdHelperService theIdHelperService) { myIdHelperService = theIdHelperService; diff --git a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/interceptor/model/ReadPartitionIdRequestDetails.java b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/interceptor/model/ReadPartitionIdRequestDetails.java index 36218fd388f..5748dac592d 100644 --- a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/interceptor/model/ReadPartitionIdRequestDetails.java +++ b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/interceptor/model/ReadPartitionIdRequestDetails.java @@ -113,6 +113,24 @@ public class ReadPartitionIdRequestDetails extends PartitionIdRequestDetails { null, RestOperationTypeEnum.EXTENDED_OPERATION_SERVER, null, null, null, null, theOperationName); } + /** + * @since 7.4.0 + */ + public static ReadPartitionIdRequestDetails forDelete(@Nonnull String theResourceType, @Nonnull IIdType theId) { + RestOperationTypeEnum op = RestOperationTypeEnum.DELETE; + return new ReadPartitionIdRequestDetails( + theResourceType, op, theId.withResourceType(theResourceType), null, null, null, null); + } + + /** + * @since 7.4.0 + */ + public static ReadPartitionIdRequestDetails forPatch(String theResourceType, IIdType theId) { + RestOperationTypeEnum op = RestOperationTypeEnum.PATCH; + return new ReadPartitionIdRequestDetails( + theResourceType, op, theId.withResourceType(theResourceType), null, null, null, null); + } + public static ReadPartitionIdRequestDetails forRead( String theResourceType, @Nonnull IIdType theId, boolean theIsVread) { RestOperationTypeEnum op = theIsVread ? RestOperationTypeEnum.VREAD : RestOperationTypeEnum.READ; diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/BasePartitioningR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/BasePartitioningR4Test.java index 1507e9487ed..0773223b9f0 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/BasePartitioningR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/BasePartitioningR4Test.java @@ -55,7 +55,7 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest { @AfterEach public void after() { - myPartitionInterceptor.assertNoRemainingIds(); + assertNoRemainingPartitionIds(); myPartitionSettings.setIncludePartitionInSearchHashes(new PartitionSettings().isIncludePartitionInSearchHashes()); myPartitionSettings.setPartitioningEnabled(new PartitionSettings().isPartitioningEnabled()); @@ -70,6 +70,10 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest { myStorageSettings.setMatchUrlCacheEnabled(new JpaStorageSettings().getMatchUrlCache()); } + protected void assertNoRemainingPartitionIds() { + myPartitionInterceptor.assertNoRemainingIds(); + } + @Override @BeforeEach public void before() throws Exception { @@ -89,7 +93,8 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest { myPartitionId4 = 4; myPartitionInterceptor = new MyReadWriteInterceptor(); - mySrdInterceptorService.registerInterceptor(myPartitionInterceptor); + + registerPartitionInterceptor(); myPartitionConfigSvc.createPartition(new PartitionEntity().setId(myPartitionId).setName(PARTITION_1), null); myPartitionConfigSvc.createPartition(new PartitionEntity().setId(myPartitionId2).setName(PARTITION_2), null); @@ -106,6 +111,11 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest { for (int i = 1; i <= 4; i++) { myPartitionConfigSvc.getPartitionById(i); } + + } + + protected void registerPartitionInterceptor() { + mySrdInterceptorService.registerInterceptor(myPartitionInterceptor); } @Override diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitionedStrictTransactionR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitionedStrictTransactionR4Test.java new file mode 100644 index 00000000000..6c8f9f09d8d --- /dev/null +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitionedStrictTransactionR4Test.java @@ -0,0 +1,211 @@ +package ca.uhn.fhir.jpa.dao.r4; + +import ca.uhn.fhir.interceptor.api.Hook; +import ca.uhn.fhir.interceptor.api.Pointcut; +import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.rest.api.Constants; +import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; +import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; +import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; +import ca.uhn.fhir.util.BundleBuilder; +import jakarta.annotation.Nonnull; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; +import org.hl7.fhir.r4.model.Bundle; +import org.hl7.fhir.r4.model.CodeType; +import org.hl7.fhir.r4.model.IdType; +import org.hl7.fhir.r4.model.Observation; +import org.hl7.fhir.r4.model.Parameters; +import org.hl7.fhir.r4.model.Patient; +import org.hl7.fhir.r4.model.StringType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Propagation; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PartitionedStrictTransactionR4Test extends BasePartitioningR4Test { + + @Autowired + private HapiTransactionService myTransactionService; + + @Override + public void before() throws Exception { + super.before(); + myTransactionService.setTransactionPropagationWhenChangingPartitions(Propagation.REQUIRES_NEW); + } + + @Override + public void after() { + super.after(); + myTransactionService.setTransactionPropagationWhenChangingPartitions(HapiTransactionService.DEFAULT_TRANSACTION_PROPAGATION_WHEN_CHANGING_PARTITIONS); + myInterceptorRegistry.unregisterInterceptorsIf(t -> t instanceof MyPartitionSelectorInterceptor); + } + + /** + * We manually register {@link MyPartitionSelectorInterceptor} for this test class + * as the partition interceptor + */ + @Override + protected void registerPartitionInterceptor() { + myInterceptorRegistry.registerInterceptor(new MyPartitionSelectorInterceptor()); + } + + @Override + protected void assertNoRemainingPartitionIds() { + // We don't use the superclass to manage partition IDs + } + + + @ParameterizedTest + @CsvSource({ + "batch , 2", + "transaction , 1", + }) + public void testSinglePartitionCreate(String theBundleType, int theExpectedCommitCount) { + BundleBuilder bb = new BundleBuilder(myFhirContext); + bb.addTransactionCreateEntry(newPatient()); + bb.addTransactionCreateEntry(newPatient()); + bb.setType(theBundleType); + Bundle input = bb.getBundleTyped(); + + // Test + myCaptureQueriesListener.clear(); + Bundle output = mySystemDao.transaction(mySrd, input); + + // Verify + assertEquals(theExpectedCommitCount, myCaptureQueriesListener.countCommits()); + assertEquals(0, myCaptureQueriesListener.countRollbacks()); + IdType id = new IdType(output.getEntry().get(0).getResponse().getLocation()); + Patient actualPatient = myPatientDao.read(id, mySrd); + RequestPartitionId actualPartitionId = (RequestPartitionId) actualPatient.getUserData(Constants.RESOURCE_PARTITION_ID); + assertThat(actualPartitionId.getPartitionIds()).containsExactly(myPartitionId); + } + + + @Test + public void testSinglePartitionDelete() { + createPatient(withId("A"), withActiveTrue()); + + BundleBuilder bb = new BundleBuilder(myFhirContext); + bb.addTransactionDeleteEntry(new IdType("Patient/A")); + Bundle input = bb.getBundleTyped(); + + // Test + myCaptureQueriesListener.clear(); + Bundle output = mySystemDao.transaction(mySrd, input); + + // Verify + assertEquals(1, myCaptureQueriesListener.countCommits()); + assertEquals(0, myCaptureQueriesListener.countRollbacks()); + IdType id = new IdType(output.getEntry().get(0).getResponse().getLocation()); + assertEquals("2", id.getVersionIdPart()); + + assertThrows(ResourceGoneException.class, () -> myPatientDao.read(id.toUnqualifiedVersionless(), mySrd)); + } + + @Test + public void testSinglePartitionPatch() { + IIdType id = createPatient(withId("A"), withActiveTrue()); + assertTrue(myPatientDao.read(id.toUnqualifiedVersionless(), mySrd).getActive()); + + Parameters patch = new Parameters(); + Parameters.ParametersParameterComponent operation = patch.addParameter(); + operation.setName("operation"); + operation + .addPart() + .setName("type") + .setValue(new CodeType("replace")); + operation + .addPart() + .setName("path") + .setValue(new StringType("Patient.active")); + operation + .addPart() + .setName("name") + .setValue(new CodeType("false")); + + BundleBuilder bb = new BundleBuilder(myFhirContext); + bb.addTransactionFhirPatchEntry(new IdType("Patient/A"), patch); + Bundle input = bb.getBundleTyped(); + + + // Test + myCaptureQueriesListener.clear(); + Bundle output = mySystemDao.transaction(mySrd, input); + + // Verify + assertEquals(1, myCaptureQueriesListener.countCommits()); + assertEquals(0, myCaptureQueriesListener.countRollbacks()); + id = new IdType(output.getEntry().get(0).getResponse().getLocation()); + assertEquals("2", id.getVersionIdPart()); + + assertFalse(myPatientDao.read(id.toUnqualifiedVersionless(), mySrd).getActive()); + } + + @Test + public void testMultipleNonMatchingPartitions() { + BundleBuilder bb = new BundleBuilder(myFhirContext); + bb.addTransactionCreateEntry(newPatient()); + bb.addTransactionCreateEntry(newObservation()); + Bundle input = bb.getBundleTyped(); + + // Test + var e = assertThrows(InvalidRequestException.class, () -> mySystemDao.transaction(mySrd, input)); + assertThat(e.getMessage()).contains("HAPI-2541: Can not process transaction with 2 entries: Entries require access to multiple/conflicting partitions"); + + } + + private static @Nonnull Patient newPatient() { + Patient patient = new Patient(); + patient.setActive(true); + return patient; + } + + private static @Nonnull Observation newObservation() { + Observation observation = new Observation(); + observation.setStatus(Observation.ObservationStatus.FINAL); + return observation; + } + + + public class MyPartitionSelectorInterceptor { + + @Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE) + public RequestPartitionId selectPartitionCreate(IBaseResource theResource) { + String resourceType = myFhirContext.getResourceType(theResource); + return selectPartition(resourceType); + } + + @Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ) + public RequestPartitionId selectPartitionRead(ReadPartitionIdRequestDetails theDetails) { + return selectPartition(theDetails.getResourceType()); + } + + @Nonnull + private RequestPartitionId selectPartition(String theResourceType) { + switch (theResourceType) { + case "Patient": + return RequestPartitionId.fromPartitionId(myPartitionId); + case "Observation": + return RequestPartitionId.fromPartitionId(myPartitionId2); + case "SearchParameter": + case "Organization": + return RequestPartitionId.defaultPartition(); + default: + throw new InternalErrorException("Don't know how to handle resource type: " + theResourceType); + } + } + + } + + +} diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java index dd046428075..9eddb6767e8 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java @@ -661,6 +661,8 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test { addCreatePartition(myPartitionId, myPartitionDate); addCreatePartition(myPartitionId, myPartitionDate); addCreatePartition(myPartitionId, myPartitionDate); + addCreatePartition(myPartitionId, myPartitionDate); + addCreatePartition(myPartitionId, myPartitionDate); Bundle input = new Bundle(); input.setType(Bundle.BundleType.TRANSACTION); @@ -2884,7 +2886,7 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test { ourLog.info("About to start transaction"); - for (int i = 0; i < 40; i++) { + for (int i = 0; i < 60; i++) { addCreatePartition(1, null); } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/stresstest/GiantTransactionPerfTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/stresstest/GiantTransactionPerfTest.java index f5cc4817110..ceadc2c1709 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/stresstest/GiantTransactionPerfTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/stresstest/GiantTransactionPerfTest.java @@ -61,6 +61,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Answers; import org.mockito.Mock; +import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/BaseTransactionProcessor.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/BaseTransactionProcessor.java index c8a18f9c7e3..ec421c5affb 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/BaseTransactionProcessor.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/BaseTransactionProcessor.java @@ -26,6 +26,7 @@ import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.Pointcut; +import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; @@ -41,15 +42,18 @@ import ca.uhn.fhir.jpa.cache.ResourcePersistentIdMap; import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.delete.DeleteConflictUtil; +import ca.uhn.fhir.jpa.model.config.PartitionSettings; import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource; import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.entity.StorageSettings; import ca.uhn.fhir.jpa.model.search.StorageProcessingMessage; +import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; import ca.uhn.fhir.jpa.searchparam.extractor.ResourceIndexedSearchParams; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryResourceMatcher; import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher; import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum; +import ca.uhn.fhir.model.valueset.BundleEntryTransactionMethodEnum; import ca.uhn.fhir.parser.DataFormatException; import ca.uhn.fhir.parser.IParser; import ca.uhn.fhir.rest.api.Constants; @@ -86,6 +90,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; import org.hl7.fhir.dstu3.model.Bundle; @@ -98,6 +103,7 @@ import org.hl7.fhir.instance.model.api.IBaseReference; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IPrimitiveType; +import org.hl7.fhir.r4.model.IdType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -141,6 +147,9 @@ public abstract class BaseTransactionProcessor { public static final Pattern INVALID_PLACEHOLDER_PATTERN = Pattern.compile("[a-zA-Z]+:.*"); private static final Logger ourLog = LoggerFactory.getLogger(BaseTransactionProcessor.class); + @Autowired + private IRequestPartitionHelperSvc myRequestPartitionHelperService; + @Autowired private PlatformTransactionManager myTxManager; @@ -163,6 +172,9 @@ public abstract class BaseTransactionProcessor { @Autowired private StorageSettings myStorageSettings; + @Autowired + PartitionSettings myPartitionSettings; + @Autowired private InMemoryResourceMatcher myInMemoryResourceMatcher; @@ -375,9 +387,6 @@ public abstract class BaseTransactionProcessor { long start = System.currentTimeMillis(); - TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); - txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); - IBaseBundle response = myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.BATCHRESPONSE.toCode()); Map responseMap = new ConcurrentHashMap<>(); @@ -701,9 +710,13 @@ public abstract class BaseTransactionProcessor { }; EntriesToProcessMap entriesToProcess; + RequestPartitionId requestPartitionId = + determineRequestPartitionIdForWriteEntries(theRequestDetails, theEntries); + try { entriesToProcess = myHapiTransactionService .withRequest(theRequestDetails) + .withRequestPartitionId(requestPartitionId) .withTransactionDetails(theTransactionDetails) .execute(txCallback); } finally { @@ -726,6 +739,82 @@ public abstract class BaseTransactionProcessor { } } + /** + * This method looks at the FHIR actions being performed in a List of bundle entries, + * and determines the associated request partitions. + */ + @Nullable + protected RequestPartitionId determineRequestPartitionIdForWriteEntries( + RequestDetails theRequestDetails, List theEntries) { + if (!myPartitionSettings.isPartitioningEnabled()) { + return RequestPartitionId.allPartitions(); + } + + RequestPartitionId retVal = null; + + for (var nextEntry : theEntries) { + RequestPartitionId nextRequestPartitionId = null; + String verb = myVersionAdapter.getEntryRequestVerb(myContext, nextEntry); + if (isNotBlank(verb)) { + BundleEntryTransactionMethodEnum verbEnum = BundleEntryTransactionMethodEnum.valueOf(verb); + switch (verbEnum) { + case GET: + continue; + case DELETE: { + String requestUrl = myVersionAdapter.getEntryRequestUrl(nextEntry); + if (isNotBlank(requestUrl)) { + IdType id = new IdType(requestUrl); + String resourceType = id.getResourceType(); + ReadPartitionIdRequestDetails details = + ReadPartitionIdRequestDetails.forDelete(resourceType, id); + nextRequestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequest( + theRequestDetails, details); + } + break; + } + case PATCH: { + String requestUrl = myVersionAdapter.getEntryRequestUrl(nextEntry); + if (isNotBlank(requestUrl)) { + IdType id = new IdType(requestUrl); + String resourceType = id.getResourceType(); + ReadPartitionIdRequestDetails details = + ReadPartitionIdRequestDetails.forPatch(resourceType, id); + nextRequestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequest( + theRequestDetails, details); + } + break; + } + case POST: + case PUT: { + IBaseResource resource = myVersionAdapter.getResource(nextEntry); + if (resource != null) { + String resourceType = myContext.getResourceType(resource); + nextRequestPartitionId = myRequestPartitionHelperService.determineCreatePartitionForRequest( + theRequestDetails, resource, resourceType); + } + } + } + } + + if (nextRequestPartitionId == null) { + // continue + } else if (retVal == null) { + retVal = nextRequestPartitionId; + } else if (!retVal.equals(nextRequestPartitionId)) { + if (myHapiTransactionService.isRequiresNewTransactionWhenChangingPartitions()) { + String msg = myContext + .getLocalizer() + .getMessage(BaseTransactionProcessor.class, "multiplePartitionAccesses", theEntries.size()); + throw new InvalidRequestException(Msg.code(2541) + msg); + } else { + retVal = retVal.mergeIds(nextRequestPartitionId); + } + } + } + + return retVal; + } + private boolean haveWriteOperationsHooks(RequestDetails theRequestDetails) { return CompositeInterceptorBroadcaster.hasHooks( Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE, myInterceptorBroadcaster, theRequestDetails) @@ -2042,6 +2131,11 @@ public abstract class BaseTransactionProcessor { } } + @VisibleForTesting + public void setPartitionSettingsForUnitTest(PartitionSettings thePartitionSettings) { + myPartitionSettings = thePartitionSettings; + } + /** * Transaction Order, per the spec: *

diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java index ac592d245e3..766d400f74c 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java @@ -76,6 +76,13 @@ public class HapiTransactionService implements IHapiTransactionService { private static final ThreadLocal ourRequestPartitionThreadLocal = new ThreadLocal<>(); private static final ThreadLocal ourExistingTransaction = new ThreadLocal<>(); + /** + * Default value for {@link #setTransactionPropagationWhenChangingPartitions(Propagation)} + * + * @since 7.6.0 + */ + public static final Propagation DEFAULT_TRANSACTION_PROPAGATION_WHEN_CHANGING_PARTITIONS = Propagation.REQUIRED; + @Autowired protected IInterceptorBroadcaster myInterceptorBroadcaster; @@ -88,7 +95,8 @@ public class HapiTransactionService implements IHapiTransactionService { @Autowired protected PartitionSettings myPartitionSettings; - private Propagation myTransactionPropagationWhenChangingPartitions = Propagation.REQUIRED; + private Propagation myTransactionPropagationWhenChangingPartitions = + DEFAULT_TRANSACTION_PROPAGATION_WHEN_CHANGING_PARTITIONS; private SleepUtil mySleepUtil = new SleepUtil(); @@ -264,7 +272,7 @@ public class HapiTransactionService implements IHapiTransactionService { try { ourExistingTransaction.set(this); - if (myTransactionPropagationWhenChangingPartitions == Propagation.REQUIRES_NEW) { + if (isRequiresNewTransactionWhenChangingPartitions()) { return executeInNewTransactionForPartitionChange( theExecutionBuilder, theCallback, requestPartitionId, previousRequestPartitionId); } else { @@ -276,6 +284,11 @@ public class HapiTransactionService implements IHapiTransactionService { } } + @Override + public boolean isRequiresNewTransactionWhenChangingPartitions() { + return myTransactionPropagationWhenChangingPartitions == Propagation.REQUIRES_NEW; + } + @Nullable private T executeInNewTransactionForPartitionChange( ExecutionBuilder theExecutionBuilder, @@ -567,7 +580,8 @@ public class HapiTransactionService implements IHapiTransactionService { return TransactionSynchronizationManager.isActualTransactionActive() && (!TransactionSynchronizationManager.isCurrentTransactionReadOnly() || theExecutionBuilder.myReadOnly) && (theExecutionBuilder.myPropagation == null - || theExecutionBuilder.myPropagation == Propagation.REQUIRED); + || theExecutionBuilder.myPropagation + == DEFAULT_TRANSACTION_PROPAGATION_WHEN_CHANGING_PARTITIONS); } @Nullable diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/IHapiTransactionService.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/IHapiTransactionService.java index a3627d16083..7d4f18a1216 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/IHapiTransactionService.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/IHapiTransactionService.java @@ -23,6 +23,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; import ca.uhn.fhir.util.ICallable; +import com.google.common.annotations.Beta; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import org.springframework.transaction.annotation.Isolation; @@ -90,6 +91,19 @@ public interface IHapiTransactionService { @Nonnull Isolation theIsolation, @Nonnull ICallable theCallback); + /** + * Returns {@literal true} if this transaction service will open a new + * transaction when the request partition is for a different partition than + * the currently executing partition. + *

+ * This is an experimental API, subject to change in a future release. + *

+ * + * @since 7.4.0 + */ + @Beta + boolean isRequiresNewTransactionWhenChangingPartitions(); + interface IExecutionBuilder extends TransactionOperations { IExecutionBuilder withIsolation(Isolation theIsolation);