Separate basic partition provider logic so it can be reused by Mongo. Update documentation.

This commit is contained in:
Martha 2024-06-28 15:25:07 -07:00
parent 208684fc6f
commit 650f35ce92
7 changed files with 84 additions and 11 deletions

View File

@ -130,14 +130,14 @@ Once enabled, HTTP Requests to the FHIR server must include the name of the part
POST www.example.com/fhir/Patient
```
With partitioning enabled, if we were to now create a patient in the `DEFAULT` paritition, the request would now look like this:
With partitioning enabled, if we were to now create a patient in the `P1` partition, the request would now look like this:
```
POST www.example.com/fhir/DEFAULT/Patient
POST www.example.com/fhir/P1/Patient
```
Failure to add a partition name to the request path will result in an error when multitenancy is enabled.
If a tenant name is not provided in the request path, the request will default the tenant and use will use the 'DEFAULT' partition.
# Limitations

View File

@ -59,6 +59,6 @@ public class JpaBatch2Config extends BaseBatch2Config {
@Bean
public IJobPartitionProvider jobPartitionProvider(
IRequestPartitionHelperSvc theRequestPartitionHelperSvc, IPartitionLookupSvc thePartitionLookupSvc) {
return new JobPartitionProvider(theRequestPartitionHelperSvc, thePartitionLookupSvc);
return new JpaPartitionProvider(theRequestPartitionHelperSvc, thePartitionLookupSvc);
}
}

View File

@ -1,6 +1,6 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.coordinator.JobPartitionProvider;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
@ -14,13 +14,12 @@ import java.util.stream.Collectors;
* The default implementation, which uses {@link IRequestPartitionHelperSvc} and {@link IPartitionLookupSvc} to compute the partition to run a batch2 job.
* The ladder will be used to handle cases when the job is configured to run against all partitions (bulk system operation).
*/
public class JobPartitionProvider implements IJobPartitionProvider {
private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
public class JpaPartitionProvider extends JobPartitionProvider {
private final IPartitionLookupSvc myPartitionLookupSvc;
public JobPartitionProvider(
public JpaPartitionProvider(
IRequestPartitionHelperSvc theRequestPartitionHelperSvc, IPartitionLookupSvc thePartitionLookupSvc) {
myRequestPartitionHelperSvc = theRequestPartitionHelperSvc;
super(theRequestPartitionHelperSvc);
myPartitionLookupSvc = thePartitionLookupSvc;
}

View File

@ -21,13 +21,13 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class JobPartitionProviderTest {
public class JpaJobPartitionProviderTest {
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Mock
private IPartitionLookupSvc myPartitionLookupSvc;
@InjectMocks
private JobPartitionProvider myJobPartitionProvider;
private JpaPartitionProvider myJobPartitionProvider;
@Test
public void getPartitions_requestSpecificPartition_returnsPartition() {

View File

@ -21,11 +21,13 @@ package ca.uhn.fhir.batch2.config;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.JobPartitionProvider;
import ca.uhn.fhir.batch2.coordinator.ReductionStepExecutorServiceImpl;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
@ -33,6 +35,7 @@ import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
@ -139,4 +142,9 @@ public abstract class BaseBatch2Config {
protected int getConcurrentConsumers() {
return 4;
}
@Bean
public IJobPartitionProvider jobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
return new JobPartitionProvider(theRequestPartitionHelperSvc);
}
}

View File

@ -0,0 +1,23 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import java.util.List;
public class JobPartitionProvider implements IJobPartitionProvider {
protected final IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
public JobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
myRequestPartitionHelperSvc = theRequestPartitionHelperSvc;
}
@Override
public List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation) {
RequestPartitionId partitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(
theRequestDetails, theOperation);
return List.of(partitionId);
}
}

View File

@ -0,0 +1,43 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class JobPartitionProviderTest {
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@InjectMocks
private JobPartitionProvider myJobPartitionProvider;
@Test
public void getPartitions_requestSpecificPartition_returnsPartition() {
// setup
SystemRequestDetails requestDetails = new SystemRequestDetails();
String operation = ProviderConstants.OPERATION_EXPORT;
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(ArgumentMatchers.eq(requestDetails), ArgumentMatchers.eq(operation))).thenReturn(partitionId);
// test
List<RequestPartitionId> partitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation);
// verify
Assertions.assertThat(partitionIds).hasSize(1);
Assertions.assertThat(partitionIds).containsExactlyInAnyOrder(partitionId);
}
}