Move partition-compatibility logic into tx service. (#6220)

This commit is contained in:
Michael Buckley 2024-08-20 18:55:09 -04:00 committed by GitHub
parent 947291e7f5
commit 709756d68d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 139 additions and 71 deletions

View File

@ -113,6 +113,11 @@ public class RequestPartitionId implements IModelJson {
return RequestPartitionId.allPartitions();
}
// don't know why this is required - otherwise PartitionedStrictTransactionR4Test fails
if (this.equals(theOther)) {
return this;
}
List<Integer> thisPartitionIds = getPartitionIds();
List<Integer> otherPartitionIds = theOther.getPartitionIds();
List<Integer> newPartitionIds = Stream.concat(thisPartitionIds.stream(), otherPartitionIds.stream())

View File

@ -750,69 +750,74 @@ public abstract class BaseTransactionProcessor {
return RequestPartitionId.allPartitions();
}
RequestPartitionId retVal = null;
return theEntries.stream()
.map(e -> getEntryRequestPartitionId(theRequestDetails, e))
.reduce(null, (accumulator, nextPartition) -> {
if (accumulator == null) {
return nextPartition;
} else if (nextPartition == null) {
return accumulator;
} else if (myHapiTransactionService.isCompatiblePartition(accumulator, nextPartition)) {
return accumulator.mergeIds(nextPartition);
} else {
String msg = myContext
.getLocalizer()
.getMessage(
BaseTransactionProcessor.class, "multiplePartitionAccesses", theEntries.size());
throw new InvalidRequestException(Msg.code(2541) + msg);
}
});
}
for (var nextEntry : theEntries) {
RequestPartitionId nextRequestPartitionId = null;
String verb = myVersionAdapter.getEntryRequestVerb(myContext, nextEntry);
if (isNotBlank(verb)) {
BundleEntryTransactionMethodEnum verbEnum = BundleEntryTransactionMethodEnum.valueOf(verb);
switch (verbEnum) {
case GET:
continue;
case DELETE: {
String requestUrl = myVersionAdapter.getEntryRequestUrl(nextEntry);
if (isNotBlank(requestUrl)) {
IdType id = new IdType(requestUrl);
String resourceType = id.getResourceType();
ReadPartitionIdRequestDetails details =
ReadPartitionIdRequestDetails.forDelete(resourceType, id);
nextRequestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequest(
theRequestDetails, details);
}
break;
}
case PATCH: {
String requestUrl = myVersionAdapter.getEntryRequestUrl(nextEntry);
if (isNotBlank(requestUrl)) {
IdType id = new IdType(requestUrl);
String resourceType = id.getResourceType();
ReadPartitionIdRequestDetails details =
ReadPartitionIdRequestDetails.forPatch(resourceType, id);
nextRequestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequest(
theRequestDetails, details);
}
break;
}
case POST:
case PUT: {
IBaseResource resource = myVersionAdapter.getResource(nextEntry);
if (resource != null) {
String resourceType = myContext.getResourceType(resource);
nextRequestPartitionId = myRequestPartitionHelperService.determineCreatePartitionForRequest(
theRequestDetails, resource, resourceType);
}
@Nullable
private RequestPartitionId getEntryRequestPartitionId(RequestDetails theRequestDetails, IBase nextEntry) {
RequestPartitionId nextWriteEntryRequestPartitionId = null;
String verb = myVersionAdapter.getEntryRequestVerb(myContext, nextEntry);
if (isNotBlank(verb)) {
BundleEntryTransactionMethodEnum verbEnum = BundleEntryTransactionMethodEnum.valueOf(verb);
switch (verbEnum) {
case GET:
nextWriteEntryRequestPartitionId = null;
break;
case DELETE: {
String requestUrl = myVersionAdapter.getEntryRequestUrl(nextEntry);
if (isNotBlank(requestUrl)) {
IdType id = new IdType(requestUrl);
String resourceType = id.getResourceType();
ReadPartitionIdRequestDetails details =
ReadPartitionIdRequestDetails.forDelete(resourceType, id);
nextWriteEntryRequestPartitionId =
myRequestPartitionHelperService.determineReadPartitionForRequest(
theRequestDetails, details);
}
break;
}
}
if (nextRequestPartitionId == null) {
// continue
} else if (retVal == null) {
retVal = nextRequestPartitionId;
} else if (!retVal.equals(nextRequestPartitionId)) {
if (myHapiTransactionService.isRequiresNewTransactionWhenChangingPartitions()) {
String msg = myContext
.getLocalizer()
.getMessage(BaseTransactionProcessor.class, "multiplePartitionAccesses", theEntries.size());
throw new InvalidRequestException(Msg.code(2541) + msg);
} else {
retVal = retVal.mergeIds(nextRequestPartitionId);
case PATCH: {
String requestUrl = myVersionAdapter.getEntryRequestUrl(nextEntry);
if (isNotBlank(requestUrl)) {
IdType id = new IdType(requestUrl);
String resourceType = id.getResourceType();
ReadPartitionIdRequestDetails details =
ReadPartitionIdRequestDetails.forPatch(resourceType, id);
nextWriteEntryRequestPartitionId =
myRequestPartitionHelperService.determineReadPartitionForRequest(
theRequestDetails, details);
}
break;
}
case POST:
case PUT: {
IBaseResource resource = myVersionAdapter.getResource(nextEntry);
if (resource != null) {
String resourceType = myContext.getResourceType(resource);
nextWriteEntryRequestPartitionId =
myRequestPartitionHelperService.determineCreatePartitionForRequest(
theRequestDetails, resource, resourceType);
}
}
}
}
return retVal;
return nextWriteEntryRequestPartitionId;
}
private boolean haveWriteOperationsHooks(RequestDetails theRequestDetails) {

View File

@ -256,8 +256,7 @@ public class HapiTransactionService implements IHapiTransactionService {
}
ourLog.trace("Starting doExecute for RequestPartitionId {}", requestPartitionId);
if (!myPartitionSettings.isPartitioningEnabled()
|| Objects.equals(previousRequestPartitionId, requestPartitionId)) {
if (isCompatiblePartition(previousRequestPartitionId, requestPartitionId)) {
if (ourExistingTransaction.get() == this && canReuseExistingTransaction(theExecutionBuilder)) {
/*
* If we're already in an active transaction, and it's for the right partition,
@ -284,11 +283,18 @@ public class HapiTransactionService implements IHapiTransactionService {
}
}
@Override
public boolean isRequiresNewTransactionWhenChangingPartitions() {
protected boolean isRequiresNewTransactionWhenChangingPartitions() {
return myTransactionPropagationWhenChangingPartitions == Propagation.REQUIRES_NEW;
}
@Override
public boolean isCompatiblePartition(
RequestPartitionId theRequestPartitionId, RequestPartitionId theOtherRequestPartitionId) {
return !myPartitionSettings.isPartitioningEnabled()
|| !isRequiresNewTransactionWhenChangingPartitions()
|| Objects.equals(theRequestPartitionId, theOtherRequestPartitionId);
}
@Nullable
private <T> T executeInNewTransactionForPartitionChange(
ExecutionBuilder theExecutionBuilder,

View File

@ -92,9 +92,7 @@ public interface IHapiTransactionService {
@Nonnull ICallable<T> theCallback);
/**
* Returns {@literal true} if this transaction service will open a new
* transaction when the request partition is for a different partition than
* the currently executing partition.
* Are two RequestPartitionId values compatible within the same transaction?
* <p>
* This is an experimental API, subject to change in a future release.
* </p>
@ -102,7 +100,10 @@ public interface IHapiTransactionService {
* @since 7.4.0
*/
@Beta
boolean isRequiresNewTransactionWhenChangingPartitions();
default boolean isCompatiblePartition(
RequestPartitionId theRequestPartitionId, RequestPartitionId theOtherRequestPartitionId) {
return true;
}
interface IExecutionBuilder extends TransactionOperations {

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.dao.tx;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
@ -13,6 +14,7 @@ import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.util.SleepUtil;
import org.hibernate.exception.ConstraintViolationException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
@ -25,6 +27,7 @@ import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.support.TransactionCallback;
import java.sql.SQLException;
@ -32,7 +35,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
@ -54,8 +59,8 @@ class HapiTransactionServiceTest {
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvcMock;
@Mock
private PartitionSettings myPartitionSettingsMock;
private final PartitionSettings myPartitionSettings = new PartitionSettings();
@Mock
private SleepUtil mySleepUtilMock;
@ -68,7 +73,7 @@ class HapiTransactionServiceTest {
myHapiTransactionService = new HapiTransactionService();
myHapiTransactionService.setTransactionManager(myTransactionManagerMock);
myHapiTransactionService.setInterceptorBroadcaster(myInterceptorBroadcasterMock);
myHapiTransactionService.setPartitionSettingsForUnitTest(myPartitionSettingsMock);
myHapiTransactionService.setPartitionSettingsForUnitTest(myPartitionSettings);
myHapiTransactionService.setRequestPartitionSvcForUnitTest(myRequestPartitionHelperSvcMock);
myHapiTransactionService.setSleepUtil(mySleepUtilMock);
mockInterceptorBroadcaster();
@ -112,9 +117,7 @@ class HapiTransactionServiceTest {
throw theException;
};
Exception theExceptionThrownByDoExecute = assertThrows(Exception.class, () -> {
myHapiTransactionService.doExecute((HapiTransactionService.ExecutionBuilder) executionBuilder, transactionCallback);
});
Exception theExceptionThrownByDoExecute = assertThrows(Exception.class, () -> myHapiTransactionService.doExecute((HapiTransactionService.ExecutionBuilder) executionBuilder, transactionCallback));
assertEquals(theExpectedNumberOfCallsToTransactionCallback, numberOfCalls.get());
verify(mySleepUtilMock, times(theExpectedNumberOfCallsToTransactionCallback - 1))
@ -191,4 +194,52 @@ class HapiTransactionServiceTest {
verify(mySleepUtilMock, times(2))
.sleepAtLeast(anyLong(), anyBoolean());
}
/**
* When are two RequestPartitionIds compatible, under different config?
*/
@Nested
class PartitionCompatibility {
RequestPartitionId partition1 = RequestPartitionId.fromPartitionId(1);
RequestPartitionId partition2 = RequestPartitionId.fromPartitionId(2);
@Test
void unPartitioned_anythingIsCompatibleWithAnything() {
// given
myPartitionSettings.setPartitioningEnabled(false);
myHapiTransactionService.setTransactionPropagationWhenChangingPartitions(Propagation.REQUIRED);
assertTrue(myHapiTransactionService.isCompatiblePartition(null, partition1));
assertTrue(myHapiTransactionService.isCompatiblePartition(partition1, null));
assertTrue(myHapiTransactionService.isCompatiblePartition(partition1, partition1));
assertTrue(myHapiTransactionService.isCompatiblePartition(partition1, partition2));
}
@Test
void partitionedDefault_anythingIsCompatibleWithAnything() {
// given
myPartitionSettings.setPartitioningEnabled(true);
myHapiTransactionService.setTransactionPropagationWhenChangingPartitions(Propagation.REQUIRED);
assertTrue(myHapiTransactionService.isCompatiblePartition(null, partition1));
assertTrue(myHapiTransactionService.isCompatiblePartition(partition1, null));
assertTrue(myHapiTransactionService.isCompatiblePartition(partition1, partition1));
assertTrue(myHapiTransactionService.isCompatiblePartition(partition1, partition2));
}
/**
* When changing partition requires a new transaction, only null and identical partitions are allowed.
*/
@Test
void partitionedRequiresNew_partitionIsOnlyCompatibleWithSelf() {
// given
myPartitionSettings.setPartitioningEnabled(true);
myHapiTransactionService.setTransactionPropagationWhenChangingPartitions(Propagation.REQUIRES_NEW);
assertTrue(myHapiTransactionService.isCompatiblePartition(partition1, partition1));
assertFalse(myHapiTransactionService.isCompatiblePartition(partition1, partition2));
}
}
}