5344 $expunge operation fails when the operation is executed on a specific partition (#5347)
* initial failing test. * solution with changelog * fixing pom dependency version * fixing pom circular dependency issue and making the new wrapper class generic. * Fixing tests assertion. * Moving test to solve dependency issues. * addressing code review comments. --------- Co-authored-by: peartree <etienne.poirier@smilecdr.com>
This commit is contained in:
parent
743e2c178b
commit
e5f700fc21
|
@ -0,0 +1,5 @@
|
||||||
|
---
|
||||||
|
type: fix
|
||||||
|
issue: 5344
|
||||||
|
jira: SMILE-7324
|
||||||
|
title: "Previously, issuing an expunge operation for resources on a specific partition would fail. This problem has been fixed."
|
|
@ -0,0 +1,91 @@
|
||||||
|
package ca.uhn.fhir.jpa.dao.tx;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
|
||||||
|
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
|
||||||
|
import ca.uhn.fhir.jpa.dao.expunge.ExpungeOperation;
|
||||||
|
import ca.uhn.fhir.jpa.dao.expunge.IResourceExpungeService;
|
||||||
|
import ca.uhn.fhir.jpa.model.dao.JpaPid;
|
||||||
|
import ca.uhn.fhir.jpa.svc.MockHapiTransactionService;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.Captor;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Spy;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyInt;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
public class ExpungeOperationTest {
|
||||||
|
|
||||||
|
@Captor
|
||||||
|
private ArgumentCaptor<HapiTransactionService.ExecutionBuilder> myBuilderArgumentCaptor;
|
||||||
|
@Spy
|
||||||
|
private MockHapiTransactionService myHapiTransactionService;
|
||||||
|
private JpaStorageSettings myStorageSettings;
|
||||||
|
@Mock
|
||||||
|
private IResourceExpungeService myIResourceExpungeService;
|
||||||
|
private static final String ourExpectedTenantId = "TenantA";
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void beforeEach(){
|
||||||
|
myStorageSettings = new JpaStorageSettings();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpunge_onSpecificTenant_willPerformExpungeOnSpecificTenant(){
|
||||||
|
// given
|
||||||
|
when(myIResourceExpungeService.findHistoricalVersionsOfDeletedResources(any(), any(), anyInt())).thenReturn(List.of(JpaPid.fromId(1l)));
|
||||||
|
when(myIResourceExpungeService.findHistoricalVersionsOfNonDeletedResources(any(), any(), anyInt())).thenReturn(List.of(JpaPid.fromId(1l)));
|
||||||
|
myStorageSettings.setExpungeBatchSize(5);
|
||||||
|
|
||||||
|
RequestDetails requestDetails = getRequestDetails();
|
||||||
|
ExpungeOptions expungeOptions = new ExpungeOptions().setExpungeDeletedResources(true).setExpungeOldVersions(true);
|
||||||
|
|
||||||
|
ExpungeOperation expungeOperation = new ExpungeOperation("Patient", null, expungeOptions, requestDetails);
|
||||||
|
|
||||||
|
expungeOperation.setHapiTransactionServiceForTesting(myHapiTransactionService);
|
||||||
|
expungeOperation.setStorageSettingsForTesting(myStorageSettings);
|
||||||
|
expungeOperation.setExpungeDaoServiceForTesting(myIResourceExpungeService);
|
||||||
|
|
||||||
|
expungeOperation.call();
|
||||||
|
|
||||||
|
// then
|
||||||
|
assertTransactionServiceWasInvokedWithTenantId(ourExpectedTenantId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertTransactionServiceWasInvokedWithTenantId(String theExpectedTenantId) {
|
||||||
|
// we have set the expungeOptions to setExpungeDeletedResources and SetExpungeOldVersions to true.
|
||||||
|
// as a result, we will be making 5 trips to the db. let's make sure that each trip was done with
|
||||||
|
// the hapiTransaction service and that the tenantId was specified.
|
||||||
|
verify(myHapiTransactionService, times(5)).doExecute(myBuilderArgumentCaptor.capture(), any());
|
||||||
|
List<HapiTransactionService.ExecutionBuilder> methodArgumentExecutionBuilders = myBuilderArgumentCaptor.getAllValues();
|
||||||
|
|
||||||
|
boolean allMatching = methodArgumentExecutionBuilders.stream()
|
||||||
|
.map(HapiTransactionService.ExecutionBuilder::getRequestDetailsForTesting)
|
||||||
|
.map(RequestDetails::getTenantId)
|
||||||
|
.allMatch(theExpectedTenantId::equals);
|
||||||
|
|
||||||
|
assertThat(allMatching, is(equalTo(true)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private RequestDetails getRequestDetails() {
|
||||||
|
RequestDetails requestDetails = new ServletRequestDetails();
|
||||||
|
requestDetails.setTenantId(ourExpectedTenantId);
|
||||||
|
return requestDetails;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,67 @@
|
||||||
|
package ca.uhn.fhir.jpa.dao.tx;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.jpa.dao.expunge.PartitionAwareSupplier;
|
||||||
|
import ca.uhn.fhir.jpa.svc.MockHapiTransactionService;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||||
|
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.Captor;
|
||||||
|
import org.mockito.Spy;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
public class PartitionAwareSupplierTest {
|
||||||
|
|
||||||
|
@Spy
|
||||||
|
private MockHapiTransactionService myHapiTransactionService;
|
||||||
|
|
||||||
|
@Captor
|
||||||
|
private ArgumentCaptor<HapiTransactionService.ExecutionBuilder> myBuilderArgumentCaptor;
|
||||||
|
|
||||||
|
private static final String ourExpectedTenantId = "TenantA";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMethodFindInPartitionedContext_withRequestDetailsHavingTenantId_willExecuteOnSpecifiedPartition(){
|
||||||
|
RequestDetails requestDetails = getRequestDetails();
|
||||||
|
|
||||||
|
PartitionAwareSupplier partitionAwareSupplier = new PartitionAwareSupplier(myHapiTransactionService, requestDetails);
|
||||||
|
partitionAwareSupplier.supplyInPartitionedContext(getResourcePersistentIdSupplier());
|
||||||
|
|
||||||
|
assertTransactionServiceWasInvokedWithTenantId(ourExpectedTenantId);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private Supplier<List<IResourcePersistentId>> getResourcePersistentIdSupplier(){
|
||||||
|
return () -> Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertTransactionServiceWasInvokedWithTenantId(String theExpectedTenantId) {
|
||||||
|
verify(myHapiTransactionService, times(1)).doExecute(myBuilderArgumentCaptor.capture(), any());
|
||||||
|
HapiTransactionService.ExecutionBuilder methodArgumentExecutionBuilder = myBuilderArgumentCaptor.getValue();
|
||||||
|
|
||||||
|
String requestDetailsTenantId = methodArgumentExecutionBuilder.getRequestDetailsForTesting().getTenantId();
|
||||||
|
|
||||||
|
assertThat(requestDetailsTenantId, is(equalTo(theExpectedTenantId)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private RequestDetails getRequestDetails() {
|
||||||
|
RequestDetails requestDetails = new ServletRequestDetails();
|
||||||
|
requestDetails.setTenantId(ourExpectedTenantId);
|
||||||
|
return requestDetails;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -40,7 +40,7 @@ public class MockHapiTransactionService extends HapiTransactionService {
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
@Override
|
@Override
|
||||||
protected <T> T doExecute(ExecutionBuilder theExecutionBuilder, TransactionCallback<T> theCallback) {
|
public <T> T doExecute(ExecutionBuilder theExecutionBuilder, TransactionCallback<T> theCallback) {
|
||||||
return theCallback.doInTransaction(myTransactionStatus);
|
return theCallback.doInTransaction(myTransactionStatus);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import ca.uhn.fhir.jpa.api.model.ExpungeOutcome;
|
||||||
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
@ -43,7 +44,7 @@ public class ExpungeOperation implements Callable<ExpungeOutcome> {
|
||||||
public static final String THREAD_PREFIX = "expunge";
|
public static final String THREAD_PREFIX = "expunge";
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private IResourceExpungeService myExpungeDaoService;
|
private IResourceExpungeService myResourceExpungeService;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private JpaStorageSettings myStorageSettings;
|
private JpaStorageSettings myStorageSettings;
|
||||||
|
@ -101,17 +102,14 @@ public class ExpungeOperation implements Callable<ExpungeOutcome> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<IResourcePersistentId> findHistoricalVersionsOfDeletedResources() {
|
private List<IResourcePersistentId> findHistoricalVersionsOfDeletedResources() {
|
||||||
List<IResourcePersistentId> retVal = myExpungeDaoService.findHistoricalVersionsOfDeletedResources(
|
List<IResourcePersistentId> retVal = getPartitionAwareSupplier()
|
||||||
myResourceName, myResourceId, myRemainingCount.get());
|
.supplyInPartitionedContext(() -> myResourceExpungeService.findHistoricalVersionsOfDeletedResources(
|
||||||
|
myResourceName, myResourceId, myRemainingCount.get()));
|
||||||
|
|
||||||
ourLog.debug("Found {} historical versions", retVal.size());
|
ourLog.debug("Found {} historical versions", retVal.size());
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<IResourcePersistentId> findHistoricalVersionsOfNonDeletedResources() {
|
|
||||||
return myExpungeDaoService.findHistoricalVersionsOfNonDeletedResources(
|
|
||||||
myResourceName, myResourceId, myRemainingCount.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean expungeLimitReached() {
|
private boolean expungeLimitReached() {
|
||||||
boolean expungeLimitReached = myRemainingCount.get() <= 0;
|
boolean expungeLimitReached = myRemainingCount.get() <= 0;
|
||||||
if (expungeLimitReached) {
|
if (expungeLimitReached) {
|
||||||
|
@ -121,15 +119,21 @@ public class ExpungeOperation implements Callable<ExpungeOutcome> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void expungeOldVersions() {
|
private void expungeOldVersions() {
|
||||||
List<IResourcePersistentId> historicalIds = findHistoricalVersionsOfNonDeletedResources();
|
List<IResourcePersistentId> historicalIds = getPartitionAwareSupplier()
|
||||||
|
.supplyInPartitionedContext(() -> myResourceExpungeService.findHistoricalVersionsOfNonDeletedResources(
|
||||||
|
myResourceName, myResourceId, myRemainingCount.get()));
|
||||||
|
|
||||||
getPartitionRunner()
|
getPartitionRunner()
|
||||||
.runInPartitionedThreads(
|
.runInPartitionedThreads(
|
||||||
historicalIds,
|
historicalIds,
|
||||||
partition -> myExpungeDaoService.expungeHistoricalVersions(
|
partition -> myResourceExpungeService.expungeHistoricalVersions(
|
||||||
myRequestDetails, partition, myRemainingCount));
|
myRequestDetails, partition, myRemainingCount));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private PartitionAwareSupplier getPartitionAwareSupplier() {
|
||||||
|
return new PartitionAwareSupplier(myTxService, myRequestDetails);
|
||||||
|
}
|
||||||
|
|
||||||
private PartitionRunner getPartitionRunner() {
|
private PartitionRunner getPartitionRunner() {
|
||||||
return new PartitionRunner(
|
return new PartitionRunner(
|
||||||
PROCESS_NAME,
|
PROCESS_NAME,
|
||||||
|
@ -144,7 +148,7 @@ public class ExpungeOperation implements Callable<ExpungeOutcome> {
|
||||||
getPartitionRunner()
|
getPartitionRunner()
|
||||||
.runInPartitionedThreads(
|
.runInPartitionedThreads(
|
||||||
theResourceIds,
|
theResourceIds,
|
||||||
partition -> myExpungeDaoService.expungeCurrentVersionOfResources(
|
partition -> myResourceExpungeService.expungeCurrentVersionOfResources(
|
||||||
myRequestDetails, partition, myRemainingCount));
|
myRequestDetails, partition, myRemainingCount));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,11 +156,26 @@ public class ExpungeOperation implements Callable<ExpungeOutcome> {
|
||||||
getPartitionRunner()
|
getPartitionRunner()
|
||||||
.runInPartitionedThreads(
|
.runInPartitionedThreads(
|
||||||
theResourceIds,
|
theResourceIds,
|
||||||
partition -> myExpungeDaoService.expungeHistoricalVersionsOfIds(
|
partition -> myResourceExpungeService.expungeHistoricalVersionsOfIds(
|
||||||
myRequestDetails, partition, myRemainingCount));
|
myRequestDetails, partition, myRemainingCount));
|
||||||
}
|
}
|
||||||
|
|
||||||
private ExpungeOutcome expungeOutcome() {
|
private ExpungeOutcome expungeOutcome() {
|
||||||
return new ExpungeOutcome().setDeletedCount(myExpungeOptions.getLimit() - myRemainingCount.get());
|
return new ExpungeOutcome().setDeletedCount(myExpungeOptions.getLimit() - myRemainingCount.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setHapiTransactionServiceForTesting(HapiTransactionService theHapiTransactionService) {
|
||||||
|
myTxService = theHapiTransactionService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setStorageSettingsForTesting(JpaStorageSettings theStorageSettings) {
|
||||||
|
myStorageSettings = theStorageSettings;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setExpungeDaoServiceForTesting(IResourceExpungeService theIResourceExpungeService) {
|
||||||
|
myResourceExpungeService = theIResourceExpungeService;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
package ca.uhn.fhir.jpa.dao.expunge;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import javax.validation.constraints.NotNull;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class wrapping a supplier in a transaction with the purpose of performing the supply operation with a
|
||||||
|
* partitioned aware context.
|
||||||
|
*/
|
||||||
|
public class PartitionAwareSupplier {
|
||||||
|
private final HapiTransactionService myTransactionService;
|
||||||
|
private final RequestDetails myRequestDetails;
|
||||||
|
|
||||||
|
@NotNull
|
||||||
|
public PartitionAwareSupplier(HapiTransactionService theTxService, RequestDetails theRequestDetails) {
|
||||||
|
myTransactionService = theTxService;
|
||||||
|
myRequestDetails = theRequestDetails;
|
||||||
|
}
|
||||||
|
|
||||||
|
@NotNull
|
||||||
|
public <T> T supplyInPartitionedContext(Supplier<T> theResourcePersistentIdSupplier) {
|
||||||
|
return myTransactionService.withRequest(myRequestDetails).execute(tx -> theResourcePersistentIdSupplier.get());
|
||||||
|
}
|
||||||
|
}
|
|
@ -483,6 +483,11 @@ public class HapiTransactionService implements IHapiTransactionService {
|
||||||
public RequestPartitionId getRequestPartitionIdForTesting() {
|
public RequestPartitionId getRequestPartitionIdForTesting() {
|
||||||
return myRequestPartitionId;
|
return myRequestPartitionId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public RequestDetails getRequestDetailsForTesting() {
|
||||||
|
return myRequestDetails;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue