Partition aware transactions (#6167)

* Partition aware transactions

* Address review comments

* Test fixes

* Remove dead issue field

* Test fixes

---------

Co-authored-by: Tadgh <garygrantgraham@gmail.com>
This commit is contained in:
James Agnew 2024-08-09 11:51:20 -04:00 committed by GitHub
parent eba2d45786
commit a894779760
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 470 additions and 44 deletions

View File

@ -40,6 +40,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
@ -98,6 +99,28 @@ public class RequestPartitionId implements IModelJson {
myAllPartitions = true;
}
/**
* Creates a new RequestPartitionId which includes all partition IDs from
* this {@link RequestPartitionId} but also includes all IDs from the given
* {@link RequestPartitionId}. Any duplicates are only included once, and
* partition names and dates are ignored and not returned. This {@link RequestPartitionId}
* and {@literal theOther} are not modified.
*
* @since 7.4.0
*/
public RequestPartitionId mergeIds(RequestPartitionId theOther) {
if (isAllPartitions() || theOther.isAllPartitions()) {
return RequestPartitionId.allPartitions();
}
List<Integer> thisPartitionIds = getPartitionIds();
List<Integer> otherPartitionIds = theOther.getPartitionIds();
List<Integer> newPartitionIds = Stream.concat(thisPartitionIds.stream(), otherPartitionIds.stream())
.distinct()
.collect(Collectors.toList());
return RequestPartitionId.fromPartitionIds(newPartitionIds);
}
public static RequestPartitionId fromJson(String theJson) throws JsonProcessingException {
return ourObjectMapper.readValue(theJson, RequestPartitionId.class);
}

View File

@ -1,3 +1,22 @@
/*-
* #%L
* HAPI FHIR - Core Library
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.repository;
import ca.uhn.fhir.context.FhirContext;

View File

@ -94,6 +94,7 @@ ca.uhn.fhir.jpa.dao.BaseStorageDao.inlineMatchNotSupported=Inline match URLs are
ca.uhn.fhir.jpa.dao.BaseStorageDao.transactionOperationWithMultipleMatchFailure=Failed to {0} resource with match URL "{1}" because this search matched {2} resources
ca.uhn.fhir.jpa.dao.BaseStorageDao.deleteByUrlThresholdExceeded=Failed to DELETE resources with match URL "{0}" because the resolved number of resources: {1} exceeds the threshold of {2}
ca.uhn.fhir.jpa.dao.BaseStorageDao.transactionOperationWithIdNotMatchFailure=Failed to {0} resource with match URL "{1}" because the matching resource does not match the provided ID
ca.uhn.fhir.jpa.dao.BaseTransactionProcessor.multiplePartitionAccesses=Can not process transaction with {0} entries: Entries require access to multiple/conflicting partitions
ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.transactionOperationFailedNoId=Failed to {0} resource in transaction because no ID was provided
ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.transactionOperationFailedUnknownId=Failed to {0} resource in transaction because no resource could be found with ID {1}
ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.uniqueIndexConflictFailure=Can not create resource of type {0} as it would create a duplicate unique index matching query: {1} (existing index belongs to {2}, new unique index created by {3})

View File

@ -41,6 +41,50 @@ public class RequestPartitionIdTest {
assertFalse(RequestPartitionId.forPartitionIdsAndNames(null, Lists.newArrayList(1, 2), null).isDefaultPartition());
}
@Test
public void testMergeIds() {
RequestPartitionId input0 = RequestPartitionId.fromPartitionIds(1, 2, 3);
RequestPartitionId input1 = RequestPartitionId.fromPartitionIds(1, 2, 4);
RequestPartitionId actual = input0.mergeIds(input1);
RequestPartitionId expected = RequestPartitionId.fromPartitionIds(1, 2, 3, 4);
assertEquals(expected, actual);
}
@Test
public void testMergeIds_ThisAllPartitions() {
RequestPartitionId input0 = RequestPartitionId.allPartitions();
RequestPartitionId input1 = RequestPartitionId.fromPartitionIds(1, 2, 4);
RequestPartitionId actual = input0.mergeIds(input1);
RequestPartitionId expected = RequestPartitionId.allPartitions();
assertEquals(expected, actual);
}
@Test
public void testMergeIds_OtherAllPartitions() {
RequestPartitionId input0 = RequestPartitionId.fromPartitionIds(1, 2, 3);
RequestPartitionId input1 = RequestPartitionId.allPartitions();
RequestPartitionId actual = input0.mergeIds(input1);
RequestPartitionId expected = RequestPartitionId.allPartitions();
assertEquals(expected, actual);
}
@Test
public void testMergeIds_IncludesDefault() {
RequestPartitionId input0 = RequestPartitionId.fromPartitionIds(1, 2, 3);
RequestPartitionId input1 = RequestPartitionId.defaultPartition();
RequestPartitionId actual = input0.mergeIds(input1);
RequestPartitionId expected = RequestPartitionId.fromPartitionIds(1, 2, 3, null);
assertEquals(expected, actual);
}
@Test
public void testSerDeserSer() throws JsonProcessingException {
{

View File

@ -0,0 +1,7 @@
---
type: fix
jira: SMILE-8652
title: "When JPA servers are configured to always require a new database
transaction when switching partitions, the server will now correctly
identify the correct partition for FHIR transaction operations, and
fail the operation if multiple partitions would be required."

View File

@ -27,7 +27,6 @@ import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamToken;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
@ -97,9 +96,6 @@ public class TransactionProcessor extends BaseTransactionProcessor {
@Autowired
private IIdHelperService<JpaPid> myIdHelperService;
@Autowired
private PartitionSettings myPartitionSettings;
@Autowired
private JpaStorageSettings myStorageSettings;
@ -150,14 +146,9 @@ public class TransactionProcessor extends BaseTransactionProcessor {
List<IBase> theEntries,
StopWatch theTransactionStopWatch) {
ITransactionProcessorVersionAdapter versionAdapter = getVersionAdapter();
RequestPartitionId requestPartitionId = null;
if (!myPartitionSettings.isPartitioningEnabled()) {
requestPartitionId = RequestPartitionId.allPartitions();
} else {
// If all entries in the transaction point to the exact same partition, we'll try and do a pre-fetch
requestPartitionId = getSinglePartitionForAllEntriesOrNull(theRequest, theEntries, versionAdapter);
}
ITransactionProcessorVersionAdapter<?, ?> versionAdapter = getVersionAdapter();
RequestPartitionId requestPartitionId =
super.determineRequestPartitionIdForWriteEntries(theRequest, theEntries);
if (requestPartitionId != null) {
preFetch(theTransactionDetails, theEntries, versionAdapter, requestPartitionId);
@ -472,24 +463,6 @@ public class TransactionProcessor extends BaseTransactionProcessor {
}
}
private RequestPartitionId getSinglePartitionForAllEntriesOrNull(
RequestDetails theRequest, List<IBase> theEntries, ITransactionProcessorVersionAdapter versionAdapter) {
RequestPartitionId retVal = null;
Set<RequestPartitionId> requestPartitionIdsForAllEntries = new HashSet<>();
for (IBase nextEntry : theEntries) {
IBaseResource resource = versionAdapter.getResource(nextEntry);
if (resource != null) {
RequestPartitionId requestPartition = myRequestPartitionSvc.determineCreatePartitionForRequest(
theRequest, resource, myFhirContext.getResourceType(resource));
requestPartitionIdsForAllEntries.add(requestPartition);
}
}
if (requestPartitionIdsForAllEntries.size() == 1) {
retVal = requestPartitionIdsForAllEntries.iterator().next();
}
return retVal;
}
/**
* Given a token parameter, build the query predicate based on its hash. Uses system and value if both are available, otherwise just value.
* If neither are available, it returns null.
@ -570,11 +543,6 @@ public class TransactionProcessor extends BaseTransactionProcessor {
}
}
@VisibleForTesting
public void setPartitionSettingsForUnitTest(PartitionSettings thePartitionSettings) {
myPartitionSettings = thePartitionSettings;
}
@VisibleForTesting
public void setIdHelperServiceForUnitTest(IIdHelperService theIdHelperService) {
myIdHelperService = theIdHelperService;

View File

@ -113,6 +113,24 @@ public class ReadPartitionIdRequestDetails extends PartitionIdRequestDetails {
null, RestOperationTypeEnum.EXTENDED_OPERATION_SERVER, null, null, null, null, theOperationName);
}
/**
* @since 7.4.0
*/
public static ReadPartitionIdRequestDetails forDelete(@Nonnull String theResourceType, @Nonnull IIdType theId) {
RestOperationTypeEnum op = RestOperationTypeEnum.DELETE;
return new ReadPartitionIdRequestDetails(
theResourceType, op, theId.withResourceType(theResourceType), null, null, null, null);
}
/**
* @since 7.4.0
*/
public static ReadPartitionIdRequestDetails forPatch(String theResourceType, IIdType theId) {
RestOperationTypeEnum op = RestOperationTypeEnum.PATCH;
return new ReadPartitionIdRequestDetails(
theResourceType, op, theId.withResourceType(theResourceType), null, null, null, null);
}
public static ReadPartitionIdRequestDetails forRead(
String theResourceType, @Nonnull IIdType theId, boolean theIsVread) {
RestOperationTypeEnum op = theIsVread ? RestOperationTypeEnum.VREAD : RestOperationTypeEnum.READ;

View File

@ -55,7 +55,7 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest {
@AfterEach
public void after() {
myPartitionInterceptor.assertNoRemainingIds();
assertNoRemainingPartitionIds();
myPartitionSettings.setIncludePartitionInSearchHashes(new PartitionSettings().isIncludePartitionInSearchHashes());
myPartitionSettings.setPartitioningEnabled(new PartitionSettings().isPartitioningEnabled());
@ -70,6 +70,10 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest {
myStorageSettings.setMatchUrlCacheEnabled(new JpaStorageSettings().getMatchUrlCache());
}
protected void assertNoRemainingPartitionIds() {
myPartitionInterceptor.assertNoRemainingIds();
}
@Override
@BeforeEach
public void before() throws Exception {
@ -89,7 +93,8 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest {
myPartitionId4 = 4;
myPartitionInterceptor = new MyReadWriteInterceptor();
mySrdInterceptorService.registerInterceptor(myPartitionInterceptor);
registerPartitionInterceptor();
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(myPartitionId).setName(PARTITION_1), null);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(myPartitionId2).setName(PARTITION_2), null);
@ -106,6 +111,11 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest {
for (int i = 1; i <= 4; i++) {
myPartitionConfigSvc.getPartitionById(i);
}
}
protected void registerPartitionInterceptor() {
mySrdInterceptorService.registerInterceptor(myPartitionInterceptor);
}
@Override

View File

@ -0,0 +1,211 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.util.BundleBuilder;
import jakarta.annotation.Nonnull;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.CodeType;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.StringType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Propagation;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class PartitionedStrictTransactionR4Test extends BasePartitioningR4Test {
@Autowired
private HapiTransactionService myTransactionService;
@Override
public void before() throws Exception {
super.before();
myTransactionService.setTransactionPropagationWhenChangingPartitions(Propagation.REQUIRES_NEW);
}
@Override
public void after() {
super.after();
myTransactionService.setTransactionPropagationWhenChangingPartitions(HapiTransactionService.DEFAULT_TRANSACTION_PROPAGATION_WHEN_CHANGING_PARTITIONS);
myInterceptorRegistry.unregisterInterceptorsIf(t -> t instanceof MyPartitionSelectorInterceptor);
}
/**
* We manually register {@link MyPartitionSelectorInterceptor} for this test class
* as the partition interceptor
*/
@Override
protected void registerPartitionInterceptor() {
myInterceptorRegistry.registerInterceptor(new MyPartitionSelectorInterceptor());
}
@Override
protected void assertNoRemainingPartitionIds() {
// We don't use the superclass to manage partition IDs
}
@ParameterizedTest
@CsvSource({
"batch , 2",
"transaction , 1",
})
public void testSinglePartitionCreate(String theBundleType, int theExpectedCommitCount) {
BundleBuilder bb = new BundleBuilder(myFhirContext);
bb.addTransactionCreateEntry(newPatient());
bb.addTransactionCreateEntry(newPatient());
bb.setType(theBundleType);
Bundle input = bb.getBundleTyped();
// Test
myCaptureQueriesListener.clear();
Bundle output = mySystemDao.transaction(mySrd, input);
// Verify
assertEquals(theExpectedCommitCount, myCaptureQueriesListener.countCommits());
assertEquals(0, myCaptureQueriesListener.countRollbacks());
IdType id = new IdType(output.getEntry().get(0).getResponse().getLocation());
Patient actualPatient = myPatientDao.read(id, mySrd);
RequestPartitionId actualPartitionId = (RequestPartitionId) actualPatient.getUserData(Constants.RESOURCE_PARTITION_ID);
assertThat(actualPartitionId.getPartitionIds()).containsExactly(myPartitionId);
}
@Test
public void testSinglePartitionDelete() {
createPatient(withId("A"), withActiveTrue());
BundleBuilder bb = new BundleBuilder(myFhirContext);
bb.addTransactionDeleteEntry(new IdType("Patient/A"));
Bundle input = bb.getBundleTyped();
// Test
myCaptureQueriesListener.clear();
Bundle output = mySystemDao.transaction(mySrd, input);
// Verify
assertEquals(1, myCaptureQueriesListener.countCommits());
assertEquals(0, myCaptureQueriesListener.countRollbacks());
IdType id = new IdType(output.getEntry().get(0).getResponse().getLocation());
assertEquals("2", id.getVersionIdPart());
assertThrows(ResourceGoneException.class, () -> myPatientDao.read(id.toUnqualifiedVersionless(), mySrd));
}
@Test
public void testSinglePartitionPatch() {
IIdType id = createPatient(withId("A"), withActiveTrue());
assertTrue(myPatientDao.read(id.toUnqualifiedVersionless(), mySrd).getActive());
Parameters patch = new Parameters();
Parameters.ParametersParameterComponent operation = patch.addParameter();
operation.setName("operation");
operation
.addPart()
.setName("type")
.setValue(new CodeType("replace"));
operation
.addPart()
.setName("path")
.setValue(new StringType("Patient.active"));
operation
.addPart()
.setName("name")
.setValue(new CodeType("false"));
BundleBuilder bb = new BundleBuilder(myFhirContext);
bb.addTransactionFhirPatchEntry(new IdType("Patient/A"), patch);
Bundle input = bb.getBundleTyped();
// Test
myCaptureQueriesListener.clear();
Bundle output = mySystemDao.transaction(mySrd, input);
// Verify
assertEquals(1, myCaptureQueriesListener.countCommits());
assertEquals(0, myCaptureQueriesListener.countRollbacks());
id = new IdType(output.getEntry().get(0).getResponse().getLocation());
assertEquals("2", id.getVersionIdPart());
assertFalse(myPatientDao.read(id.toUnqualifiedVersionless(), mySrd).getActive());
}
@Test
public void testMultipleNonMatchingPartitions() {
BundleBuilder bb = new BundleBuilder(myFhirContext);
bb.addTransactionCreateEntry(newPatient());
bb.addTransactionCreateEntry(newObservation());
Bundle input = bb.getBundleTyped();
// Test
var e = assertThrows(InvalidRequestException.class, () -> mySystemDao.transaction(mySrd, input));
assertThat(e.getMessage()).contains("HAPI-2541: Can not process transaction with 2 entries: Entries require access to multiple/conflicting partitions");
}
private static @Nonnull Patient newPatient() {
Patient patient = new Patient();
patient.setActive(true);
return patient;
}
private static @Nonnull Observation newObservation() {
Observation observation = new Observation();
observation.setStatus(Observation.ObservationStatus.FINAL);
return observation;
}
public class MyPartitionSelectorInterceptor {
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE)
public RequestPartitionId selectPartitionCreate(IBaseResource theResource) {
String resourceType = myFhirContext.getResourceType(theResource);
return selectPartition(resourceType);
}
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ)
public RequestPartitionId selectPartitionRead(ReadPartitionIdRequestDetails theDetails) {
return selectPartition(theDetails.getResourceType());
}
@Nonnull
private RequestPartitionId selectPartition(String theResourceType) {
switch (theResourceType) {
case "Patient":
return RequestPartitionId.fromPartitionId(myPartitionId);
case "Observation":
return RequestPartitionId.fromPartitionId(myPartitionId2);
case "SearchParameter":
case "Organization":
return RequestPartitionId.defaultPartition();
default:
throw new InternalErrorException("Don't know how to handle resource type: " + theResourceType);
}
}
}
}

View File

@ -661,6 +661,8 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test {
addCreatePartition(myPartitionId, myPartitionDate);
addCreatePartition(myPartitionId, myPartitionDate);
addCreatePartition(myPartitionId, myPartitionDate);
addCreatePartition(myPartitionId, myPartitionDate);
addCreatePartition(myPartitionId, myPartitionDate);
Bundle input = new Bundle();
input.setType(Bundle.BundleType.TRANSACTION);
@ -2884,7 +2886,7 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test {
ourLog.info("About to start transaction");
for (int i = 0; i < 40; i++) {
for (int i = 0; i < 60; i++) {
addCreatePartition(1, null);
}

View File

@ -61,6 +61,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -26,6 +26,7 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
@ -41,15 +42,18 @@ import ca.uhn.fhir.jpa.cache.ResourcePersistentIdMap;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.delete.DeleteConflictUtil;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.model.search.StorageProcessingMessage;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.extractor.ResourceIndexedSearchParams;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryResourceMatcher;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
import ca.uhn.fhir.model.valueset.BundleEntryTransactionMethodEnum;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.Constants;
@ -86,6 +90,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.dstu3.model.Bundle;
@ -98,6 +103,7 @@ import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.IdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -141,6 +147,9 @@ public abstract class BaseTransactionProcessor {
public static final Pattern INVALID_PLACEHOLDER_PATTERN = Pattern.compile("[a-zA-Z]+:.*");
private static final Logger ourLog = LoggerFactory.getLogger(BaseTransactionProcessor.class);
@Autowired
private IRequestPartitionHelperSvc myRequestPartitionHelperService;
@Autowired
private PlatformTransactionManager myTxManager;
@ -163,6 +172,9 @@ public abstract class BaseTransactionProcessor {
@Autowired
private StorageSettings myStorageSettings;
@Autowired
PartitionSettings myPartitionSettings;
@Autowired
private InMemoryResourceMatcher myInMemoryResourceMatcher;
@ -375,9 +387,6 @@ public abstract class BaseTransactionProcessor {
long start = System.currentTimeMillis();
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
IBaseBundle response =
myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.BATCHRESPONSE.toCode());
Map<Integer, Object> responseMap = new ConcurrentHashMap<>();
@ -701,9 +710,13 @@ public abstract class BaseTransactionProcessor {
};
EntriesToProcessMap entriesToProcess;
RequestPartitionId requestPartitionId =
determineRequestPartitionIdForWriteEntries(theRequestDetails, theEntries);
try {
entriesToProcess = myHapiTransactionService
.withRequest(theRequestDetails)
.withRequestPartitionId(requestPartitionId)
.withTransactionDetails(theTransactionDetails)
.execute(txCallback);
} finally {
@ -726,6 +739,82 @@ public abstract class BaseTransactionProcessor {
}
}
/**
* This method looks at the FHIR actions being performed in a List of bundle entries,
* and determines the associated request partitions.
*/
@Nullable
protected RequestPartitionId determineRequestPartitionIdForWriteEntries(
RequestDetails theRequestDetails, List<IBase> theEntries) {
if (!myPartitionSettings.isPartitioningEnabled()) {
return RequestPartitionId.allPartitions();
}
RequestPartitionId retVal = null;
for (var nextEntry : theEntries) {
RequestPartitionId nextRequestPartitionId = null;
String verb = myVersionAdapter.getEntryRequestVerb(myContext, nextEntry);
if (isNotBlank(verb)) {
BundleEntryTransactionMethodEnum verbEnum = BundleEntryTransactionMethodEnum.valueOf(verb);
switch (verbEnum) {
case GET:
continue;
case DELETE: {
String requestUrl = myVersionAdapter.getEntryRequestUrl(nextEntry);
if (isNotBlank(requestUrl)) {
IdType id = new IdType(requestUrl);
String resourceType = id.getResourceType();
ReadPartitionIdRequestDetails details =
ReadPartitionIdRequestDetails.forDelete(resourceType, id);
nextRequestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequest(
theRequestDetails, details);
}
break;
}
case PATCH: {
String requestUrl = myVersionAdapter.getEntryRequestUrl(nextEntry);
if (isNotBlank(requestUrl)) {
IdType id = new IdType(requestUrl);
String resourceType = id.getResourceType();
ReadPartitionIdRequestDetails details =
ReadPartitionIdRequestDetails.forPatch(resourceType, id);
nextRequestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequest(
theRequestDetails, details);
}
break;
}
case POST:
case PUT: {
IBaseResource resource = myVersionAdapter.getResource(nextEntry);
if (resource != null) {
String resourceType = myContext.getResourceType(resource);
nextRequestPartitionId = myRequestPartitionHelperService.determineCreatePartitionForRequest(
theRequestDetails, resource, resourceType);
}
}
}
}
if (nextRequestPartitionId == null) {
// continue
} else if (retVal == null) {
retVal = nextRequestPartitionId;
} else if (!retVal.equals(nextRequestPartitionId)) {
if (myHapiTransactionService.isRequiresNewTransactionWhenChangingPartitions()) {
String msg = myContext
.getLocalizer()
.getMessage(BaseTransactionProcessor.class, "multiplePartitionAccesses", theEntries.size());
throw new InvalidRequestException(Msg.code(2541) + msg);
} else {
retVal = retVal.mergeIds(nextRequestPartitionId);
}
}
}
return retVal;
}
private boolean haveWriteOperationsHooks(RequestDetails theRequestDetails) {
return CompositeInterceptorBroadcaster.hasHooks(
Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE, myInterceptorBroadcaster, theRequestDetails)
@ -2042,6 +2131,11 @@ public abstract class BaseTransactionProcessor {
}
}
@VisibleForTesting
public void setPartitionSettingsForUnitTest(PartitionSettings thePartitionSettings) {
myPartitionSettings = thePartitionSettings;
}
/**
* Transaction Order, per the spec:
* <p>

View File

@ -76,6 +76,13 @@ public class HapiTransactionService implements IHapiTransactionService {
private static final ThreadLocal<RequestPartitionId> ourRequestPartitionThreadLocal = new ThreadLocal<>();
private static final ThreadLocal<HapiTransactionService> ourExistingTransaction = new ThreadLocal<>();
/**
* Default value for {@link #setTransactionPropagationWhenChangingPartitions(Propagation)}
*
* @since 7.6.0
*/
public static final Propagation DEFAULT_TRANSACTION_PROPAGATION_WHEN_CHANGING_PARTITIONS = Propagation.REQUIRED;
@Autowired
protected IInterceptorBroadcaster myInterceptorBroadcaster;
@ -88,7 +95,8 @@ public class HapiTransactionService implements IHapiTransactionService {
@Autowired
protected PartitionSettings myPartitionSettings;
private Propagation myTransactionPropagationWhenChangingPartitions = Propagation.REQUIRED;
private Propagation myTransactionPropagationWhenChangingPartitions =
DEFAULT_TRANSACTION_PROPAGATION_WHEN_CHANGING_PARTITIONS;
private SleepUtil mySleepUtil = new SleepUtil();
@ -264,7 +272,7 @@ public class HapiTransactionService implements IHapiTransactionService {
try {
ourExistingTransaction.set(this);
if (myTransactionPropagationWhenChangingPartitions == Propagation.REQUIRES_NEW) {
if (isRequiresNewTransactionWhenChangingPartitions()) {
return executeInNewTransactionForPartitionChange(
theExecutionBuilder, theCallback, requestPartitionId, previousRequestPartitionId);
} else {
@ -276,6 +284,11 @@ public class HapiTransactionService implements IHapiTransactionService {
}
}
@Override
public boolean isRequiresNewTransactionWhenChangingPartitions() {
return myTransactionPropagationWhenChangingPartitions == Propagation.REQUIRES_NEW;
}
@Nullable
private <T> T executeInNewTransactionForPartitionChange(
ExecutionBuilder theExecutionBuilder,
@ -567,7 +580,8 @@ public class HapiTransactionService implements IHapiTransactionService {
return TransactionSynchronizationManager.isActualTransactionActive()
&& (!TransactionSynchronizationManager.isCurrentTransactionReadOnly() || theExecutionBuilder.myReadOnly)
&& (theExecutionBuilder.myPropagation == null
|| theExecutionBuilder.myPropagation == Propagation.REQUIRED);
|| theExecutionBuilder.myPropagation
== DEFAULT_TRANSACTION_PROPAGATION_WHEN_CHANGING_PARTITIONS);
}
@Nullable

View File

@ -23,6 +23,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.util.ICallable;
import com.google.common.annotations.Beta;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.springframework.transaction.annotation.Isolation;
@ -90,6 +91,19 @@ public interface IHapiTransactionService {
@Nonnull Isolation theIsolation,
@Nonnull ICallable<T> theCallback);
/**
* Returns {@literal true} if this transaction service will open a new
* transaction when the request partition is for a different partition than
* the currently executing partition.
* <p>
* This is an experimental API, subject to change in a future release.
* </p>
*
* @since 7.4.0
*/
@Beta
boolean isRequiresNewTransactionWhenChangingPartitions();
interface IExecutionBuilder extends TransactionOperations {
IExecutionBuilder withIsolation(Isolation theIsolation);