From 208684fc6fb152ec4827ff1c566b92c09fc16b54 Mon Sep 17 00:00:00 2001 From: Martha Date: Fri, 28 Jun 2024 12:46:32 -0700 Subject: [PATCH] Add support for RequestPartitionId.allPartitions as input to the IJobPartitionProvider. Add tests and fix one test. Add missing changelog. --- ...ing-reindex-job-across-all-partitions.yaml | 7 ++ .../fhir/jpa/batch2/JobPartitionProvider.java | 42 ++++++++++ .../uhn/fhir/jpa/batch2/JpaBatch2Config.java | 9 +++ .../jpa/batch2/JobPartitionProviderTest.java | 76 +++++++++++++++++++ .../bulk/imprt2/ConsumeFilesStepR4Test.java | 20 ++--- .../job/ReindexJobWithPartitioningTest.java | 28 +++++-- .../fhir/batch2/config/BaseBatch2Config.java | 8 -- .../coordinator/JobPartitionProvider.java | 25 ------ .../coordinator/JobPartitionProviderTest.java | 42 ---------- 9 files changed, 167 insertions(+), 90 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_4_0/6008-support-for-running-reindex-job-across-all-partitions.yaml create mode 100644 hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JobPartitionProvider.java create mode 100644 hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JobPartitionProviderTest.java delete mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobPartitionProvider.java delete mode 100644 hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobPartitionProviderTest.java diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_4_0/6008-support-for-running-reindex-job-across-all-partitions.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_4_0/6008-support-for-running-reindex-job-across-all-partitions.yaml new file mode 100644 index 00000000000..b67f90d140a --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_4_0/6008-support-for-running-reindex-job-across-all-partitions.yaml @@ -0,0 +1,7 @@ +--- +type: add +issue: 6008 +title: "Previously, when partitioning is enabled, the reindex job could only be run against one partition. +The reindex job parameters can now accept a list of partitions or all partitions (list of RequestPartitionId or RequestPartitionId.allPartitions). +In the future, the functionality can be extended to other bulk operations run via batch2 jobs (e.g. $delete-expunge, $export). +" diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JobPartitionProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JobPartitionProvider.java new file mode 100644 index 00000000000..a133bde7a1a --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JobPartitionProvider.java @@ -0,0 +1,42 @@ +package ca.uhn.fhir.jpa.batch2; + +import ca.uhn.fhir.batch2.api.IJobPartitionProvider; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.entity.PartitionEntity; +import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc; +import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; +import ca.uhn.fhir.rest.api.server.RequestDetails; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * The default implementation, which uses {@link IRequestPartitionHelperSvc} and {@link IPartitionLookupSvc} to compute the partition to run a batch2 job. + * The ladder will be used to handle cases when the job is configured to run against all partitions (bulk system operation). + */ +public class JobPartitionProvider implements IJobPartitionProvider { + private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc; + private final IPartitionLookupSvc myPartitionLookupSvc; + + public JobPartitionProvider( + IRequestPartitionHelperSvc theRequestPartitionHelperSvc, IPartitionLookupSvc thePartitionLookupSvc) { + myRequestPartitionHelperSvc = theRequestPartitionHelperSvc; + myPartitionLookupSvc = thePartitionLookupSvc; + } + + @Override + public List getPartitions(RequestDetails theRequestDetails, String theOperation) { + RequestPartitionId partitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation( + theRequestDetails, theOperation); + if (!partitionId.isAllPartitions()) { + return List.of(partitionId); + } + // handle (bulk) system operations that are typically configured with RequestPartitionId.allPartitions() + // populate the actual list of all partitions + List partitionIdList = myPartitionLookupSvc.listPartitions().stream() + .map(PartitionEntity::toRequestPartitionId) + .collect(Collectors.toList()); + partitionIdList.add(RequestPartitionId.defaultPartition()); + return partitionIdList; + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaBatch2Config.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaBatch2Config.java index f73a88570f3..bdc9c2d45bf 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaBatch2Config.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaBatch2Config.java @@ -19,6 +19,7 @@ */ package ca.uhn.fhir.jpa.batch2; +import ca.uhn.fhir.batch2.api.IJobPartitionProvider; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.config.BaseBatch2Config; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; @@ -27,6 +28,8 @@ import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository; import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; +import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc; +import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; import jakarta.persistence.EntityManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -52,4 +55,10 @@ public class JpaBatch2Config extends BaseBatch2Config { theEntityManager, theInterceptorBroadcaster); } + + @Bean + public IJobPartitionProvider jobPartitionProvider( + IRequestPartitionHelperSvc theRequestPartitionHelperSvc, IPartitionLookupSvc thePartitionLookupSvc) { + return new JobPartitionProvider(theRequestPartitionHelperSvc, thePartitionLookupSvc); + } } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JobPartitionProviderTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JobPartitionProviderTest.java new file mode 100644 index 00000000000..aab9c65a3c4 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JobPartitionProviderTest.java @@ -0,0 +1,76 @@ +package ca.uhn.fhir.jpa.batch2; + +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.entity.PartitionEntity; +import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc; +import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; +import ca.uhn.fhir.rest.server.provider.ProviderConstants; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentMatchers; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class JobPartitionProviderTest { + @Mock + private IRequestPartitionHelperSvc myRequestPartitionHelperSvc; + @Mock + private IPartitionLookupSvc myPartitionLookupSvc; + @InjectMocks + private JobPartitionProvider myJobPartitionProvider; + + @Test + public void getPartitions_requestSpecificPartition_returnsPartition() { + // setup + SystemRequestDetails requestDetails = new SystemRequestDetails(); + String operation = ProviderConstants.OPERATION_EXPORT; + + RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1); + when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(ArgumentMatchers.eq(requestDetails), ArgumentMatchers.eq(operation))).thenReturn(partitionId); + + // test + List partitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation); + + // verify + Assertions.assertThat(partitionIds).hasSize(1); + Assertions.assertThat(partitionIds).containsExactlyInAnyOrder(partitionId); + } + + @Test + public void getPartitions_requestAllPartitions_returnsListOfAllSpecificPartitions() { + // setup + SystemRequestDetails requestDetails = new SystemRequestDetails(); + String operation = ProviderConstants.OPERATION_EXPORT; + + when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(ArgumentMatchers.eq(requestDetails), ArgumentMatchers.eq(operation))) + .thenReturn( RequestPartitionId.allPartitions()); + List partitionIds = List.of(RequestPartitionId.fromPartitionIds(1), RequestPartitionId.fromPartitionIds(2)); + + List partitionEntities = new ArrayList<>(); + partitionIds.forEach(partitionId -> { + PartitionEntity entity = mock(PartitionEntity.class); + when(entity.toRequestPartitionId()).thenReturn(partitionId); + partitionEntities.add(entity); + }); + when(myPartitionLookupSvc.listPartitions()).thenReturn(partitionEntities); + List expectedPartitionIds = new ArrayList<>(partitionIds); + expectedPartitionIds.add(RequestPartitionId.defaultPartition()); + + // test + List actualPartitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation); + + // verify + Assertions.assertThat(actualPartitionIds).hasSize(expectedPartitionIds.size()); + Assertions.assertThat(actualPartitionIds).containsExactlyInAnyOrder(expectedPartitionIds.toArray(new RequestPartitionId[0])); + } +} \ No newline at end of file diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/imprt2/ConsumeFilesStepR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/imprt2/ConsumeFilesStepR4Test.java index 7e44fbdaf67..75aa29e41d3 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/imprt2/ConsumeFilesStepR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/imprt2/ConsumeFilesStepR4Test.java @@ -1,8 +1,5 @@ package ca.uhn.fhir.jpa.bulk.imprt2; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertFalse; import ca.uhn.fhir.batch2.api.JobExecutionFailedException; import ca.uhn.fhir.batch2.jobs.imprt.ConsumeFilesStep; import ca.uhn.fhir.interceptor.model.RequestPartitionId; @@ -24,6 +21,9 @@ import java.util.List; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @TestMethodOrder(MethodOrderer.MethodName.class) @@ -141,15 +141,17 @@ public class ConsumeFilesStepR4Test extends BasePartitioningR4Test { // Validate - if (partitionEnabled) { - assertEquals(8, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); - } else { - assertEquals(6, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); - } + int expectedSelectQueryCount = partitionEnabled ? 8 : 6; + assertEquals(expectedSelectQueryCount, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); assertEquals(2, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); assertEquals(4, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); - assertEquals(1, myCaptureQueriesListener.countCommits()); + + // PartitionLookupSvcImpl#lookupPartitionByName generates one additional commit + // because it executes in a transaction (calls executeInTransaction) + // we may want to change that in the future + int expectedCommitCount = partitionEnabled ? 2 : 1; + assertEquals(expectedCommitCount, myCaptureQueriesListener.countCommits()); assertEquals(0, myCaptureQueriesListener.countRollbacks()); patient = myPatientDao.read(new IdType("Patient/A"), mySrd); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobWithPartitioningTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobWithPartitioningTest.java index 311641f0a06..d2641ba753b 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobWithPartitioningTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobWithPartitioningTest.java @@ -42,6 +42,7 @@ public class ReindexJobWithPartitioningTest extends BaseJpaR4Test { RequestPartitionId partition1 = RequestPartitionId.fromPartitionId(1); RequestPartitionId partition2 = RequestPartitionId.fromPartitionId(2); + RequestPartitionId defaultPartition = RequestPartitionId.defaultPartition(); Observation observation1 = buildResource("Observation", withStatus(Observation.ObservationStatus.FINAL.toCode())); myObservationDao.create(observation1, new SystemRequestDetails().setRequestPartitionId(partition1)); @@ -54,6 +55,8 @@ public class ReindexJobWithPartitioningTest extends BaseJpaR4Test { myPatientDao.create(patient1, new SystemRequestDetails().setRequestPartitionId(partition1)); Patient patient2 = buildResource("Patient", withActiveFalse()); myPatientDao.create(patient2, new SystemRequestDetails().setRequestPartitionId(partition2)); + Patient patient3 = buildResource("Patient", withActiveFalse()); + myPatientDao.create(patient3, new SystemRequestDetails().setRequestPartitionId(defaultPartition)); } @AfterEach @@ -63,17 +66,30 @@ public class ReindexJobWithPartitioningTest extends BaseJpaR4Test { } public static Stream getReindexParameters() { - List allPartitions = List.of(RequestPartitionId.fromPartitionId(1), RequestPartitionId.fromPartitionId(2)); + List twoPartitions = List.of(RequestPartitionId.fromPartitionId(1), RequestPartitionId.fromPartitionId(2)); List partition1 = List.of(RequestPartitionId.fromPartitionId(1)); + List allPartitions = List.of(RequestPartitionId.allPartitions()); return Stream.of( - Arguments.of(List.of(), List.of(), false, 5), + // includes all resources from all partitions - partition 1, partition 2 and default partition + Arguments.of(List.of(), List.of(), false, 6), + // includes all Observations + Arguments.of(List.of("Observation?"), twoPartitions, false, 3), + // includes all Observations Arguments.of(List.of("Observation?"), allPartitions, false, 3), Arguments.of(List.of("Observation?"), List.of(), false, 0), + // includes Observations in partition 1 Arguments.of(List.of("Observation?"), partition1, true, 2), - Arguments.of(List.of("Observation?", "Patient?"), allPartitions, false, 5), - Arguments.of(List.of("Observation?", "Patient?"), allPartitions, true, 3), - Arguments.of(List.of("Observation?status=final", "Patient?"), allPartitions, false, 4), - Arguments.of(List.of("Observation?status=final", "Patient?"), allPartitions, true, 2), + // includes all Patients from all partitions - partition 1, partition 2 and default partition + Arguments.of(List.of("Patient?"), allPartitions, false, 3), + // includes Patients and Observations in partitions 1 and 2 + Arguments.of(List.of("Observation?", "Patient?"), twoPartitions, false, 5), + // includes Observations from partition 1 and Patients from partition 2 + Arguments.of(List.of("Observation?", "Patient?"), twoPartitions, true, 3), + // includes final Observations and Patients from partitions 1 and 2 + Arguments.of(List.of("Observation?status=final", "Patient?"), twoPartitions, false, 4), + // includes final Observations from partition 1 and Patients from partition 2 + Arguments.of(List.of("Observation?status=final", "Patient?"), twoPartitions, true, 2), + // includes final Observations and Patients from partitions 1 Arguments.of(List.of("Observation?status=final", "Patient?"), partition1, false, 2) ); } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java index d403ee0f287..d2bff951f83 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java @@ -21,13 +21,11 @@ package ca.uhn.fhir.batch2.config; import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobMaintenanceService; -import ca.uhn.fhir.batch2.api.IJobPartitionProvider; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.IReductionStepExecutorService; import ca.uhn.fhir.batch2.channel.BatchJobSender; import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl; import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; -import ca.uhn.fhir.batch2.coordinator.JobPartitionProvider; import ca.uhn.fhir.batch2.coordinator.ReductionStepExecutorServiceImpl; import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor; import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl; @@ -35,7 +33,6 @@ import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; -import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; @@ -142,9 +139,4 @@ public abstract class BaseBatch2Config { protected int getConcurrentConsumers() { return 4; } - - @Bean - public IJobPartitionProvider jobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) { - return new JobPartitionProvider(theRequestPartitionHelperSvc); - } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobPartitionProvider.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobPartitionProvider.java deleted file mode 100644 index 88dc0cfc4a1..00000000000 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobPartitionProvider.java +++ /dev/null @@ -1,25 +0,0 @@ -package ca.uhn.fhir.batch2.coordinator; - -import ca.uhn.fhir.batch2.api.IJobPartitionProvider; -import ca.uhn.fhir.interceptor.model.RequestPartitionId; -import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; -import ca.uhn.fhir.rest.api.server.RequestDetails; - -import java.util.List; - -/** - * The default implementation, which uses {@link IRequestPartitionHelperSvc} to compute the partition to run a batch2 job. - */ -public class JobPartitionProvider implements IJobPartitionProvider { - private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc; - - public JobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) { - myRequestPartitionHelperSvc = theRequestPartitionHelperSvc; - } - - @Override - public List getPartitions(RequestDetails theRequestDetails, String theOperation) { - return List.of(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation( - theRequestDetails, theOperation)); - } -} diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobPartitionProviderTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobPartitionProviderTest.java deleted file mode 100644 index da03c869de9..00000000000 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobPartitionProviderTest.java +++ /dev/null @@ -1,42 +0,0 @@ -package ca.uhn.fhir.batch2.coordinator; - -import ca.uhn.fhir.interceptor.model.RequestPartitionId; -import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; -import ca.uhn.fhir.rest.api.server.SystemRequestDetails; -import ca.uhn.fhir.rest.server.provider.ProviderConstants; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import java.util.List; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -public class JobPartitionProviderTest { - @Mock - private IRequestPartitionHelperSvc myRequestPartitionHelperSvc; - @InjectMocks - private JobPartitionProvider myJobPartitionProvider; - - @Test - public void getPartitions_requestSpecificPartition_returnsPartition() { - // setup - SystemRequestDetails requestDetails = new SystemRequestDetails(); - String operation = ProviderConstants.OPERATION_EXPORT; - - RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1); - when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(eq(requestDetails), eq(operation))).thenReturn(partitionId); - - // test - List partitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation); - - // verify - assertThat(partitionIds).hasSize(1); - assertThat(partitionIds).containsExactlyInAnyOrder(partitionId); - } -} \ No newline at end of file