diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5344-expunge-failing-when-executing-on-partition.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5344-expunge-failing-when-executing-on-partition.yaml new file mode 100644 index 00000000000..61ffdfbdaf6 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5344-expunge-failing-when-executing-on-partition.yaml @@ -0,0 +1,5 @@ +--- +type: fix +issue: 5344 +jira: SMILE-7324 +title: "Previously, issuing an expunge operation for resources on a specific partition would fail. This problem has been fixed." diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ExpungeOperationTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ExpungeOperationTest.java new file mode 100644 index 00000000000..1ef53f61b23 --- /dev/null +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/ExpungeOperationTest.java @@ -0,0 +1,91 @@ +package ca.uhn.fhir.jpa.dao.tx; + +import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; +import ca.uhn.fhir.jpa.api.model.ExpungeOptions; +import ca.uhn.fhir.jpa.dao.expunge.ExpungeOperation; +import ca.uhn.fhir.jpa.dao.expunge.IResourceExpungeService; +import ca.uhn.fhir.jpa.model.dao.JpaPid; +import ca.uhn.fhir.jpa.svc.MockHapiTransactionService; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class ExpungeOperationTest { + + @Captor + private ArgumentCaptor myBuilderArgumentCaptor; + @Spy + private MockHapiTransactionService myHapiTransactionService; + private JpaStorageSettings myStorageSettings; + @Mock + private IResourceExpungeService myIResourceExpungeService; + private static final String ourExpectedTenantId = "TenantA"; + + @BeforeEach + public void beforeEach(){ + myStorageSettings = new JpaStorageSettings(); + } + + @Test + public void testExpunge_onSpecificTenant_willPerformExpungeOnSpecificTenant(){ + // given + when(myIResourceExpungeService.findHistoricalVersionsOfDeletedResources(any(), any(), anyInt())).thenReturn(List.of(JpaPid.fromId(1l))); + when(myIResourceExpungeService.findHistoricalVersionsOfNonDeletedResources(any(), any(), anyInt())).thenReturn(List.of(JpaPid.fromId(1l))); + myStorageSettings.setExpungeBatchSize(5); + + RequestDetails requestDetails = getRequestDetails(); + ExpungeOptions expungeOptions = new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true); + + ExpungeOperation expungeOperation = new ExpungeOperation("Patient", null, expungeOptions, requestDetails); + + expungeOperation.setHapiTransactionServiceForTesting(myHapiTransactionService); + expungeOperation.setStorageSettingsForTesting(myStorageSettings); + expungeOperation.setExpungeDaoServiceForTesting(myIResourceExpungeService); + + expungeOperation.call(); + + // then + assertTransactionServiceWasInvokedWithTenantId(ourExpectedTenantId); + } + + private void assertTransactionServiceWasInvokedWithTenantId(String theExpectedTenantId) { + // we have set the expungeOptions to setExpungeDeletedResources and SetExpungeOldVersions to true. + // as a result, we will be making 5 trips to the db. let's make sure that each trip was done with + // the hapiTransaction service and that the tenantId was specified. + verify(myHapiTransactionService, times(5)).doExecute(myBuilderArgumentCaptor.capture(), any()); + List methodArgumentExecutionBuilders = myBuilderArgumentCaptor.getAllValues(); + + boolean allMatching = methodArgumentExecutionBuilders.stream() + .map(HapiTransactionService.ExecutionBuilder::getRequestDetailsForTesting) + .map(RequestDetails::getTenantId) + .allMatch(theExpectedTenantId::equals); + + assertThat(allMatching, is(equalTo(true))); + } + + private RequestDetails getRequestDetails() { + RequestDetails requestDetails = new ServletRequestDetails(); + requestDetails.setTenantId(ourExpectedTenantId); + return requestDetails; + } + +} diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/PartitionAwareSupplierTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/PartitionAwareSupplierTest.java new file mode 100644 index 00000000000..b9352eceea9 --- /dev/null +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/tx/PartitionAwareSupplierTest.java @@ -0,0 +1,67 @@ +package ca.uhn.fhir.jpa.dao.tx; + +import ca.uhn.fhir.jpa.dao.expunge.PartitionAwareSupplier; +import ca.uhn.fhir.jpa.svc.MockHapiTransactionService; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +public class PartitionAwareSupplierTest { + + @Spy + private MockHapiTransactionService myHapiTransactionService; + + @Captor + private ArgumentCaptor myBuilderArgumentCaptor; + + private static final String ourExpectedTenantId = "TenantA"; + + @Test + public void testMethodFindInPartitionedContext_withRequestDetailsHavingTenantId_willExecuteOnSpecifiedPartition(){ + RequestDetails requestDetails = getRequestDetails(); + + PartitionAwareSupplier partitionAwareSupplier = new PartitionAwareSupplier(myHapiTransactionService, requestDetails); + partitionAwareSupplier.supplyInPartitionedContext(getResourcePersistentIdSupplier()); + + assertTransactionServiceWasInvokedWithTenantId(ourExpectedTenantId); + + } + + private Supplier> getResourcePersistentIdSupplier(){ + return () -> Collections.emptyList(); + } + + private void assertTransactionServiceWasInvokedWithTenantId(String theExpectedTenantId) { + verify(myHapiTransactionService, times(1)).doExecute(myBuilderArgumentCaptor.capture(), any()); + HapiTransactionService.ExecutionBuilder methodArgumentExecutionBuilder = myBuilderArgumentCaptor.getValue(); + + String requestDetailsTenantId = methodArgumentExecutionBuilder.getRequestDetailsForTesting().getTenantId(); + + assertThat(requestDetailsTenantId, is(equalTo(theExpectedTenantId))); + } + + private RequestDetails getRequestDetails() { + RequestDetails requestDetails = new ServletRequestDetails(); + requestDetails.setTenantId(ourExpectedTenantId); + return requestDetails; + } + +} diff --git a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/svc/MockHapiTransactionService.java b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/svc/MockHapiTransactionService.java index 29c2eae81ea..8f7a5b86aad 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/svc/MockHapiTransactionService.java +++ b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/svc/MockHapiTransactionService.java @@ -40,7 +40,7 @@ public class MockHapiTransactionService extends HapiTransactionService { @Nullable @Override - protected T doExecute(ExecutionBuilder theExecutionBuilder, TransactionCallback theCallback) { + public T doExecute(ExecutionBuilder theExecutionBuilder, TransactionCallback theCallback) { return theCallback.doInTransaction(myTransactionStatus); } } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/expunge/ExpungeOperation.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/expunge/ExpungeOperation.java index 0a8ceb54240..9af4034046f 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/expunge/ExpungeOperation.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/expunge/ExpungeOperation.java @@ -25,6 +25,7 @@ import ca.uhn.fhir.jpa.api.model.ExpungeOutcome; import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -43,7 +44,7 @@ public class ExpungeOperation implements Callable { public static final String THREAD_PREFIX = "expunge"; @Autowired - private IResourceExpungeService myExpungeDaoService; + private IResourceExpungeService myResourceExpungeService; @Autowired private JpaStorageSettings myStorageSettings; @@ -101,17 +102,14 @@ public class ExpungeOperation implements Callable { } private List findHistoricalVersionsOfDeletedResources() { - List retVal = myExpungeDaoService.findHistoricalVersionsOfDeletedResources( - myResourceName, myResourceId, myRemainingCount.get()); + List retVal = getPartitionAwareSupplier() + .supplyInPartitionedContext(() -> myResourceExpungeService.findHistoricalVersionsOfDeletedResources( + myResourceName, myResourceId, myRemainingCount.get())); + ourLog.debug("Found {} historical versions", retVal.size()); return retVal; } - private List findHistoricalVersionsOfNonDeletedResources() { - return myExpungeDaoService.findHistoricalVersionsOfNonDeletedResources( - myResourceName, myResourceId, myRemainingCount.get()); - } - private boolean expungeLimitReached() { boolean expungeLimitReached = myRemainingCount.get() <= 0; if (expungeLimitReached) { @@ -121,15 +119,21 @@ public class ExpungeOperation implements Callable { } private void expungeOldVersions() { - List historicalIds = findHistoricalVersionsOfNonDeletedResources(); + List historicalIds = getPartitionAwareSupplier() + .supplyInPartitionedContext(() -> myResourceExpungeService.findHistoricalVersionsOfNonDeletedResources( + myResourceName, myResourceId, myRemainingCount.get())); getPartitionRunner() .runInPartitionedThreads( historicalIds, - partition -> myExpungeDaoService.expungeHistoricalVersions( + partition -> myResourceExpungeService.expungeHistoricalVersions( myRequestDetails, partition, myRemainingCount)); } + private PartitionAwareSupplier getPartitionAwareSupplier() { + return new PartitionAwareSupplier(myTxService, myRequestDetails); + } + private PartitionRunner getPartitionRunner() { return new PartitionRunner( PROCESS_NAME, @@ -144,7 +148,7 @@ public class ExpungeOperation implements Callable { getPartitionRunner() .runInPartitionedThreads( theResourceIds, - partition -> myExpungeDaoService.expungeCurrentVersionOfResources( + partition -> myResourceExpungeService.expungeCurrentVersionOfResources( myRequestDetails, partition, myRemainingCount)); } @@ -152,11 +156,26 @@ public class ExpungeOperation implements Callable { getPartitionRunner() .runInPartitionedThreads( theResourceIds, - partition -> myExpungeDaoService.expungeHistoricalVersionsOfIds( + partition -> myResourceExpungeService.expungeHistoricalVersionsOfIds( myRequestDetails, partition, myRemainingCount)); } private ExpungeOutcome expungeOutcome() { return new ExpungeOutcome().setDeletedCount(myExpungeOptions.getLimit() - myRemainingCount.get()); } + + @VisibleForTesting + public void setHapiTransactionServiceForTesting(HapiTransactionService theHapiTransactionService) { + myTxService = theHapiTransactionService; + } + + @VisibleForTesting + public void setStorageSettingsForTesting(JpaStorageSettings theStorageSettings) { + myStorageSettings = theStorageSettings; + } + + @VisibleForTesting + public void setExpungeDaoServiceForTesting(IResourceExpungeService theIResourceExpungeService) { + myResourceExpungeService = theIResourceExpungeService; + } } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/expunge/PartitionAwareSupplier.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/expunge/PartitionAwareSupplier.java new file mode 100644 index 00000000000..156690acc0f --- /dev/null +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/expunge/PartitionAwareSupplier.java @@ -0,0 +1,27 @@ +package ca.uhn.fhir.jpa.dao.expunge; + +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.rest.api.server.RequestDetails; + +import java.util.function.Supplier; +import javax.validation.constraints.NotNull; + +/** + * Utility class wrapping a supplier in a transaction with the purpose of performing the supply operation with a + * partitioned aware context. + */ +public class PartitionAwareSupplier { + private final HapiTransactionService myTransactionService; + private final RequestDetails myRequestDetails; + + @NotNull + public PartitionAwareSupplier(HapiTransactionService theTxService, RequestDetails theRequestDetails) { + myTransactionService = theTxService; + myRequestDetails = theRequestDetails; + } + + @NotNull + public T supplyInPartitionedContext(Supplier theResourcePersistentIdSupplier) { + return myTransactionService.withRequest(myRequestDetails).execute(tx -> theResourcePersistentIdSupplier.get()); + } +} diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java index a54a370a982..50d4669be68 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java @@ -483,6 +483,11 @@ public class HapiTransactionService implements IHapiTransactionService { public RequestPartitionId getRequestPartitionIdForTesting() { return myRequestPartitionId; } + + @VisibleForTesting + public RequestDetails getRequestDetailsForTesting() { + return myRequestDetails; + } } /**