Add support for RequestPartitionId.allPartitions as input to the IJobPartitionProvider. Add tests and fix one test. Add missing changelog.

This commit is contained in:
Martha 2024-06-28 12:46:32 -07:00
parent 318ff95baf
commit 208684fc6f
9 changed files with 167 additions and 90 deletions

View File

@ -0,0 +1,7 @@
---
type: add
issue: 6008
title: "Previously, when partitioning is enabled, the reindex job could only be run against one partition.
The reindex job parameters can now accept a list of partitions or all partitions (list of RequestPartitionId or RequestPartitionId.allPartitions).
In the future, the functionality can be extended to other bulk operations run via batch2 jobs (e.g. $delete-expunge, $export).
"

View File

@ -0,0 +1,42 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import java.util.List;
import java.util.stream.Collectors;
/**
* The default implementation, which uses {@link IRequestPartitionHelperSvc} and {@link IPartitionLookupSvc} to compute the partition to run a batch2 job.
* The ladder will be used to handle cases when the job is configured to run against all partitions (bulk system operation).
*/
public class JobPartitionProvider implements IJobPartitionProvider {
private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
private final IPartitionLookupSvc myPartitionLookupSvc;
public JobPartitionProvider(
IRequestPartitionHelperSvc theRequestPartitionHelperSvc, IPartitionLookupSvc thePartitionLookupSvc) {
myRequestPartitionHelperSvc = theRequestPartitionHelperSvc;
myPartitionLookupSvc = thePartitionLookupSvc;
}
@Override
public List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation) {
RequestPartitionId partitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(
theRequestDetails, theOperation);
if (!partitionId.isAllPartitions()) {
return List.of(partitionId);
}
// handle (bulk) system operations that are typically configured with RequestPartitionId.allPartitions()
// populate the actual list of all partitions
List<RequestPartitionId> partitionIdList = myPartitionLookupSvc.listPartitions().stream()
.map(PartitionEntity::toRequestPartitionId)
.collect(Collectors.toList());
partitionIdList.add(RequestPartitionId.defaultPartition());
return partitionIdList;
}
}

View File

@ -19,6 +19,7 @@
*/ */
package ca.uhn.fhir.jpa.batch2; package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.config.BaseBatch2Config; import ca.uhn.fhir.batch2.config.BaseBatch2Config;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
@ -27,6 +28,8 @@ import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository; import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import jakarta.persistence.EntityManager; import jakarta.persistence.EntityManager;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -52,4 +55,10 @@ public class JpaBatch2Config extends BaseBatch2Config {
theEntityManager, theEntityManager,
theInterceptorBroadcaster); theInterceptorBroadcaster);
} }
@Bean
public IJobPartitionProvider jobPartitionProvider(
IRequestPartitionHelperSvc theRequestPartitionHelperSvc, IPartitionLookupSvc thePartitionLookupSvc) {
return new JobPartitionProvider(theRequestPartitionHelperSvc, thePartitionLookupSvc);
}
} }

View File

@ -0,0 +1,76 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class JobPartitionProviderTest {
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Mock
private IPartitionLookupSvc myPartitionLookupSvc;
@InjectMocks
private JobPartitionProvider myJobPartitionProvider;
@Test
public void getPartitions_requestSpecificPartition_returnsPartition() {
// setup
SystemRequestDetails requestDetails = new SystemRequestDetails();
String operation = ProviderConstants.OPERATION_EXPORT;
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(ArgumentMatchers.eq(requestDetails), ArgumentMatchers.eq(operation))).thenReturn(partitionId);
// test
List <RequestPartitionId> partitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation);
// verify
Assertions.assertThat(partitionIds).hasSize(1);
Assertions.assertThat(partitionIds).containsExactlyInAnyOrder(partitionId);
}
@Test
public void getPartitions_requestAllPartitions_returnsListOfAllSpecificPartitions() {
// setup
SystemRequestDetails requestDetails = new SystemRequestDetails();
String operation = ProviderConstants.OPERATION_EXPORT;
when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(ArgumentMatchers.eq(requestDetails), ArgumentMatchers.eq(operation)))
.thenReturn( RequestPartitionId.allPartitions());
List<RequestPartitionId> partitionIds = List.of(RequestPartitionId.fromPartitionIds(1), RequestPartitionId.fromPartitionIds(2));
List<PartitionEntity> partitionEntities = new ArrayList<>();
partitionIds.forEach(partitionId -> {
PartitionEntity entity = mock(PartitionEntity.class);
when(entity.toRequestPartitionId()).thenReturn(partitionId);
partitionEntities.add(entity);
});
when(myPartitionLookupSvc.listPartitions()).thenReturn(partitionEntities);
List<RequestPartitionId> expectedPartitionIds = new ArrayList<>(partitionIds);
expectedPartitionIds.add(RequestPartitionId.defaultPartition());
// test
List<RequestPartitionId> actualPartitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation);
// verify
Assertions.assertThat(actualPartitionIds).hasSize(expectedPartitionIds.size());
Assertions.assertThat(actualPartitionIds).containsExactlyInAnyOrder(expectedPartitionIds.toArray(new RequestPartitionId[0]));
}
}

View File

@ -1,8 +1,5 @@
package ca.uhn.fhir.jpa.bulk.imprt2; package ca.uhn.fhir.jpa.bulk.imprt2;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException; import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.jobs.imprt.ConsumeFilesStep; import ca.uhn.fhir.batch2.jobs.imprt.ConsumeFilesStep;
import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.interceptor.model.RequestPartitionId;
@ -24,6 +21,9 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@TestMethodOrder(MethodOrderer.MethodName.class) @TestMethodOrder(MethodOrderer.MethodName.class)
@ -141,15 +141,17 @@ public class ConsumeFilesStepR4Test extends BasePartitioningR4Test {
// Validate // Validate
if (partitionEnabled) { int expectedSelectQueryCount = partitionEnabled ? 8 : 6;
assertEquals(8, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); assertEquals(expectedSelectQueryCount, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
} else {
assertEquals(6, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
}
assertEquals(2, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); assertEquals(2, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
assertEquals(4, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); assertEquals(4, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(1, myCaptureQueriesListener.countCommits());
// PartitionLookupSvcImpl#lookupPartitionByName generates one additional commit
// because it executes in a transaction (calls executeInTransaction)
// we may want to change that in the future
int expectedCommitCount = partitionEnabled ? 2 : 1;
assertEquals(expectedCommitCount, myCaptureQueriesListener.countCommits());
assertEquals(0, myCaptureQueriesListener.countRollbacks()); assertEquals(0, myCaptureQueriesListener.countRollbacks());
patient = myPatientDao.read(new IdType("Patient/A"), mySrd); patient = myPatientDao.read(new IdType("Patient/A"), mySrd);

View File

@ -42,6 +42,7 @@ public class ReindexJobWithPartitioningTest extends BaseJpaR4Test {
RequestPartitionId partition1 = RequestPartitionId.fromPartitionId(1); RequestPartitionId partition1 = RequestPartitionId.fromPartitionId(1);
RequestPartitionId partition2 = RequestPartitionId.fromPartitionId(2); RequestPartitionId partition2 = RequestPartitionId.fromPartitionId(2);
RequestPartitionId defaultPartition = RequestPartitionId.defaultPartition();
Observation observation1 = buildResource("Observation", withStatus(Observation.ObservationStatus.FINAL.toCode())); Observation observation1 = buildResource("Observation", withStatus(Observation.ObservationStatus.FINAL.toCode()));
myObservationDao.create(observation1, new SystemRequestDetails().setRequestPartitionId(partition1)); myObservationDao.create(observation1, new SystemRequestDetails().setRequestPartitionId(partition1));
@ -54,6 +55,8 @@ public class ReindexJobWithPartitioningTest extends BaseJpaR4Test {
myPatientDao.create(patient1, new SystemRequestDetails().setRequestPartitionId(partition1)); myPatientDao.create(patient1, new SystemRequestDetails().setRequestPartitionId(partition1));
Patient patient2 = buildResource("Patient", withActiveFalse()); Patient patient2 = buildResource("Patient", withActiveFalse());
myPatientDao.create(patient2, new SystemRequestDetails().setRequestPartitionId(partition2)); myPatientDao.create(patient2, new SystemRequestDetails().setRequestPartitionId(partition2));
Patient patient3 = buildResource("Patient", withActiveFalse());
myPatientDao.create(patient3, new SystemRequestDetails().setRequestPartitionId(defaultPartition));
} }
@AfterEach @AfterEach
@ -63,17 +66,30 @@ public class ReindexJobWithPartitioningTest extends BaseJpaR4Test {
} }
public static Stream<Arguments> getReindexParameters() { public static Stream<Arguments> getReindexParameters() {
List<RequestPartitionId> allPartitions = List.of(RequestPartitionId.fromPartitionId(1), RequestPartitionId.fromPartitionId(2)); List<RequestPartitionId> twoPartitions = List.of(RequestPartitionId.fromPartitionId(1), RequestPartitionId.fromPartitionId(2));
List<RequestPartitionId> partition1 = List.of(RequestPartitionId.fromPartitionId(1)); List<RequestPartitionId> partition1 = List.of(RequestPartitionId.fromPartitionId(1));
List<RequestPartitionId> allPartitions = List.of(RequestPartitionId.allPartitions());
return Stream.of( return Stream.of(
Arguments.of(List.of(), List.of(), false, 5), // includes all resources from all partitions - partition 1, partition 2 and default partition
Arguments.of(List.of(), List.of(), false, 6),
// includes all Observations
Arguments.of(List.of("Observation?"), twoPartitions, false, 3),
// includes all Observations
Arguments.of(List.of("Observation?"), allPartitions, false, 3), Arguments.of(List.of("Observation?"), allPartitions, false, 3),
Arguments.of(List.of("Observation?"), List.of(), false, 0), Arguments.of(List.of("Observation?"), List.of(), false, 0),
// includes Observations in partition 1
Arguments.of(List.of("Observation?"), partition1, true, 2), Arguments.of(List.of("Observation?"), partition1, true, 2),
Arguments.of(List.of("Observation?", "Patient?"), allPartitions, false, 5), // includes all Patients from all partitions - partition 1, partition 2 and default partition
Arguments.of(List.of("Observation?", "Patient?"), allPartitions, true, 3), Arguments.of(List.of("Patient?"), allPartitions, false, 3),
Arguments.of(List.of("Observation?status=final", "Patient?"), allPartitions, false, 4), // includes Patients and Observations in partitions 1 and 2
Arguments.of(List.of("Observation?status=final", "Patient?"), allPartitions, true, 2), Arguments.of(List.of("Observation?", "Patient?"), twoPartitions, false, 5),
// includes Observations from partition 1 and Patients from partition 2
Arguments.of(List.of("Observation?", "Patient?"), twoPartitions, true, 3),
// includes final Observations and Patients from partitions 1 and 2
Arguments.of(List.of("Observation?status=final", "Patient?"), twoPartitions, false, 4),
// includes final Observations from partition 1 and Patients from partition 2
Arguments.of(List.of("Observation?status=final", "Patient?"), twoPartitions, true, 2),
// includes final Observations and Patients from partitions 1
Arguments.of(List.of("Observation?status=final", "Patient?"), partition1, false, 2) Arguments.of(List.of("Observation?status=final", "Patient?"), partition1, false, 2)
); );
} }

View File

@ -21,13 +21,11 @@ package ca.uhn.fhir.batch2.config;
import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService; import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IReductionStepExecutorService; import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.channel.BatchJobSender; import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl; import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.JobPartitionProvider;
import ca.uhn.fhir.batch2.coordinator.ReductionStepExecutorServiceImpl; import ca.uhn.fhir.batch2.coordinator.ReductionStepExecutorServiceImpl;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor; import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl; import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
@ -35,7 +33,6 @@ import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
@ -142,9 +139,4 @@ public abstract class BaseBatch2Config {
protected int getConcurrentConsumers() { protected int getConcurrentConsumers() {
return 4; return 4;
} }
@Bean
public IJobPartitionProvider jobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
return new JobPartitionProvider(theRequestPartitionHelperSvc);
}
} }

View File

@ -1,25 +0,0 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import java.util.List;
/**
* The default implementation, which uses {@link IRequestPartitionHelperSvc} to compute the partition to run a batch2 job.
*/
public class JobPartitionProvider implements IJobPartitionProvider {
private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
public JobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
myRequestPartitionHelperSvc = theRequestPartitionHelperSvc;
}
@Override
public List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation) {
return List.of(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(
theRequestDetails, theOperation));
}
}

View File

@ -1,42 +0,0 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class JobPartitionProviderTest {
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@InjectMocks
private JobPartitionProvider myJobPartitionProvider;
@Test
public void getPartitions_requestSpecificPartition_returnsPartition() {
// setup
SystemRequestDetails requestDetails = new SystemRequestDetails();
String operation = ProviderConstants.OPERATION_EXPORT;
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(eq(requestDetails), eq(operation))).thenReturn(partitionId);
// test
List <RequestPartitionId> partitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation);
// verify
assertThat(partitionIds).hasSize(1);
assertThat(partitionIds).containsExactlyInAnyOrder(partitionId);
}
}