4610 enable partitioning in bulk export (#4682)

* Initial commit adding support for partition awareness to Bulk Export.

* Additional cleanup and remove references in tests to "all partitions".

* Additional cleanup and remove references in tests to "all partitions".

* More cleanup, test fixes and additional checking to validate partition accesses.

* Additional changes to ensure checking of partition during polling operations.

* Add change log.

* More cleanup.

* Change recommended in code review.

---------

Co-authored-by: ianmarshall <ian@simpatico.ai>
This commit is contained in:
IanMMarshall 2023-03-27 14:22:30 -04:00 committed by GitHub
parent 90da12fbdf
commit b2c0a48915
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 738 additions and 181 deletions

View File

@ -8,8 +8,10 @@ import ca.uhn.fhir.model.dstu2.resource.Observation;
import ca.uhn.fhir.model.dstu2.resource.Patient;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.exceptions.FhirClientConnectionException;
import ca.uhn.fhir.util.XmlUtil;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@ -31,10 +33,21 @@ public class BuiltJarDstu2IT {
@BeforeAll
public static void beforeClass() {
// Reset OutputFactory as this test creates custom OutputFactory
XmlUtil.resetOutputFactoryForTest();
System.setProperty(javax.xml.stream.XMLInputFactory.class.getName(), "FOO");
System.setProperty(javax.xml.stream.XMLOutputFactory.class.getName(), "FOO");
}
@AfterAll
public static void afterClass() {
// Clear environment settings to avoid leaking to later tests.
System.clearProperty(javax.xml.stream.XMLInputFactory.class.getName());
System.clearProperty(javax.xml.stream.XMLOutputFactory.class.getName());
// Reset OutputFactory as this test creates custom OutputFactory
XmlUtil.resetOutputFactoryForTest();
}
@Test
public void testParserXml() {

View File

@ -1927,4 +1927,11 @@ public class XmlUtil {
transformer.transform(new DOMSource(theElement), new StreamResult(buffer));
return buffer.toString();
}
/**
* FOR UNIT TESTS ONLY - Used to reset OutputFactory for test cases that customize OutputFactory
*/
public static void resetOutputFactoryForTest() {
ourOutputFactory = null;
}
}

View File

@ -0,0 +1,4 @@
---
type: change
issue: 4610
title: "Bulk export operations have been enhanced to be fully partition aware."

View File

@ -3,3 +3,4 @@ The database migration may take several minutes.
These changes will be applied automatically on first startup.
To avoid this delay on first startup, run the migration manually.
Bulk export behaviour is changing in this release such that Binary resources created as part of the response will now be created in the partition that the bulk export was requested rather than in the DEFAULT partition as was being done previously.

View File

@ -81,7 +81,7 @@ The criteria for determining the partition will depend on your use case. For exa
## Identify Partition for Read (Optional)
A hook against the [`Pointcut.STORAGE_PARTITION_IDENTIFY_READ`](/hapi-fhir/apidocs/hapi-fhir-base/ca/uhn/fhir/interceptor/api/Pointcut.html#STORAGE_PARTITION_IDENTIFY_READ) pointcut must be registered, and this hook method will be invoked every time a resource is created in order to determine the partition to assign the resource to.
A hook against the [`Pointcut.STORAGE_PARTITION_IDENTIFY_READ`](/hapi-fhir/apidocs/hapi-fhir-base/ca/uhn/fhir/interceptor/api/Pointcut.html#STORAGE_PARTITION_IDENTIFY_READ) pointcut must be registered, and this hook method will be invoked every time a resource is read in order to determine the partition to read the resource from.
As of HAPI FHIR 5.3.0, the *Identify Partition for Read* hook method may return multiple partition names or IDs. If more than one partition is identified, the server will search in all identified partitions.
@ -167,8 +167,6 @@ None of the limitations listed here are considered permanent. Over time the HAPI
* **Cross-partition History Operations are not supported**: It is not possible to perform a `_history` operation that spans all partitions (`_history` does work when applied to a single partition however).
* **Bulk Operations are not partition aware**: Bulk export operations will export data across all partitions.
* **Package Operations are not partition aware**: Package operations will only create, update and query resources in the default partition.
* **Advanced Elasticsearch indexing is not partition optimized**: The results are correctly partitioned, but the extended indexing is not optimized to account for partitions.

View File

@ -158,7 +158,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
filterBySpecificPatient(theParams, resourceType, patientSearchParam, map);
SearchRuntimeDetails searchRuntime = new SearchRuntimeDetails(null, jobId);
IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(map, searchRuntime, null, RequestPartitionId.allPartitions());
IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(map, searchRuntime, null, theParams.getPartitionIdOrAllPartitions());
while (resultIterator.hasNext()) {
pids.add(resultIterator.next());
}
@ -203,7 +203,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(map,
new SearchRuntimeDetails(null, theJobId),
null,
RequestPartitionId.allPartitions());
theParams.getPartitionIdOrAllPartitions());
while (resultIterator.hasNext()) {
pids.add(resultIterator.next());
}
@ -244,15 +244,17 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
}
private LinkedHashSet<JpaPid> getSingletonGroupList(ExportPIDIteratorParameters theParams) {
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(theParams.getGroupId()), SystemRequestDetails.newSystemRequestAllPartitions());
JpaPid pidOrNull = myIdHelperService.getPidOrNull(RequestPartitionId.allPartitions(), group);
RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions();
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(theParams.getGroupId()),
new SystemRequestDetails().setRequestPartitionId(partitionId));
JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group);
LinkedHashSet<JpaPid> pids = new LinkedHashSet<>();
pids.add(pidOrNull);
return pids;
}
/**
* Get a ISearchBuilder for the given resource type this partition is responsible for.
* Get a ISearchBuilder for the given resource type.
*/
protected ISearchBuilder<JpaPid> getSearchBuilderForResourceType(String theResourceType) {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceType);
@ -314,15 +316,15 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
List<IIdType> ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList());
ourLog.info("While extracting patients from a group, we found {} patients.", ids.size());
ourLog.info("Found patients: {}", ids.stream().map(id -> id.getValue()).collect(Collectors.joining(", ")));
// Are bulk exports partition aware or care about partition at all? This does
List<JpaPid> pidsOrThrowException = members;
LinkedHashSet<JpaPid> patientPidsToExport = new LinkedHashSet<>(pidsOrThrowException);
if (theParameters.isExpandMdm()) {
SystemRequestDetails srd = SystemRequestDetails.newSystemRequestAllPartitions();
RequestPartitionId partitionId = theParameters.getPartitionIdOrAllPartitions();
SystemRequestDetails srd = new SystemRequestDetails().setRequestPartitionId(partitionId);
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(theParameters.getGroupId()), srd);
JpaPid pidOrNull = myIdHelperService.getPidOrNull(RequestPartitionId.allPartitions(), group);
JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group);
List<MdmPidTuple<JpaPid>> goldenPidSourcePidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
goldenPidSourcePidTuple.forEach(tuple -> {
patientPidsToExport.add(tuple.getGoldenPid());
@ -352,7 +354,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(map,
new SearchRuntimeDetails(null, theParameters.getJobId()),
null,
RequestPartitionId.allPartitions());
theParameters.getPartitionIdOrAllPartitions());
while (resultIterator.hasNext()) {
resPids.add(resultIterator.next());
@ -447,17 +449,18 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
}
//Execute query and all found pids to our local iterator.
RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions();
IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(expandedSpMap,
new SearchRuntimeDetails(null, theParams.getJobId()),
null,
RequestPartitionId.allPartitions());
partitionId);
while (resultIterator.hasNext()) {
theReadPids.add(resultIterator.next());
}
// add _include to results to support ONC
Set<Include> includes = Collections.singleton(new Include("*", true));
SystemRequestDetails requestDetails = SystemRequestDetails.newSystemRequestAllPartitions();
SystemRequestDetails requestDetails = new SystemRequestDetails().setRequestPartitionId(partitionId);
Set<JpaPid> includeIds = searchBuilder.loadIncludes(myContext, myEntityManager, theReadPids, includes, false, expandedSpMap.getLastUpdated(), theParams.getJobId(), requestDetails, null);
// gets rid of the Patient duplicates
theReadPids.addAll(includeIds.stream().filter((id) -> !id.getResourceType().equals("Patient")).collect(Collectors.toSet()));
@ -507,9 +510,10 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
*/
private Set<JpaPid> expandAllPatientPidsFromGroup(ExportPIDIteratorParameters theParams) {
Set<JpaPid> expandedIds = new HashSet<>();
SystemRequestDetails requestDetails = SystemRequestDetails.newSystemRequestAllPartitions();
RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions();
SystemRequestDetails requestDetails = new SystemRequestDetails().setRequestPartitionId(partitionId);
IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(theParams.getGroupId()), requestDetails);
JpaPid pidOrNull = myIdHelperService.getPidOrNull(RequestPartitionId.allPartitions(), group);
JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group);
//Attempt to perform MDM Expansion of membership
if (theParams.isExpandMdm()) {

View File

@ -22,17 +22,16 @@ import ca.uhn.fhir.mdm.dao.IMdmLinkDao;
import ca.uhn.fhir.mdm.model.MdmPidTuple;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.rest.api.server.storage.BaseResourcePersistentId;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Group;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
@ -45,6 +44,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@ -60,18 +60,19 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class JpaBulkExportProcessorTest {
private class ListResultIterator implements IResultIterator {
private static class ListResultIterator implements IResultIterator<JpaPid> {
private List<IResourcePersistentId> myList;
private final List<JpaPid> myList;
private int index;
public ListResultIterator(List<IResourcePersistentId> theList) {
public ListResultIterator(List<JpaPid> theList) {
myList = theList;
}
@ -86,7 +87,7 @@ public class JpaBulkExportProcessorTest {
}
@Override
public Collection<IResourcePersistentId> getNextResultBatch(long theBatchSize) {
public Collection<JpaPid> getNextResultBatch(long theBatchSize) {
return null;
}
@ -101,7 +102,7 @@ public class JpaBulkExportProcessorTest {
}
@Override
public IResourcePersistentId next() {
public JpaPid next() {
return myList.get(index++);
}
}
@ -119,13 +120,13 @@ public class JpaBulkExportProcessorTest {
private DaoRegistry myDaoRegistry;
@Mock
private SearchBuilderFactory mySearchBuilderFactory;
private SearchBuilderFactory<JpaPid> mySearchBuilderFactory;
@Mock
private IIdHelperService myIdHelperService;
private IIdHelperService<JpaPid> myIdHelperService;
@Mock
private IMdmLinkDao myMdmLinkDao;
private IMdmLinkDao<JpaPid,?> myMdmLinkDao;
@Mock
private MdmExpansionCacheSvc myMdmExpansionCacheSvc;
@ -144,29 +145,31 @@ public class JpaBulkExportProcessorTest {
return parameters;
}
private List<IPrimitiveType> createPatientTypes() {
private List<IIdType> createPatientTypes() {
long id1 = 123;
long id2 = 456;
String patient1Id = "Patient/" + id1;
String patient2Id = "Patient/" + id2;
List<IPrimitiveType> patientTypes = Arrays.asList(
return Arrays.asList(
new IdDt(patient1Id),
new IdDt(patient2Id)
);
return patientTypes;
}
private MdmPidTuple createTuple(long theGroupId, long theGoldenId) {
private MdmPidTuple<JpaPid> createTuple(long theGroupId, long theGoldenId) {
return MdmPidTuple.fromGoldenAndSource(JpaPid.fromId(theGoldenId), JpaPid.fromId(theGroupId));
}
@Test
public void getResourcePidIterator_paramsWithPatientExportStyle_returnsAnIterator() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void getResourcePidIterator_paramsWithPatientExportStyle_returnsAnIterator(boolean thePartitioned) {
// setup
ExportPIDIteratorParameters parameters = createExportParameters(BulkDataExportOptions.ExportStyle.PATIENT);
parameters.setResourceType("Patient");
parameters.setPartitionId(getPartitionIdFromParams(thePartitioned));
SearchParameterMap map = new SearchParameterMap();
List<SearchParameterMap> maps = new ArrayList<>();
maps.add(map);
@ -178,8 +181,8 @@ public class JpaBulkExportProcessorTest {
);
// extra mocks
IFhirResourceDao<?> mockDao = mock(IFhirResourceDao.class);
ISearchBuilder searchBuilder = mock(ISearchBuilder.class);
IFhirResourceDao<Patient> mockDao = mock(IFhirResourceDao.class);
ISearchBuilder<JpaPid> searchBuilder = mock(ISearchBuilder.class);
// when
when(myStorageSettings.getIndexMissingFields())
@ -196,7 +199,7 @@ public class JpaBulkExportProcessorTest {
eq(map),
any(SearchRuntimeDetails.class),
any(),
eq(RequestPartitionId.allPartitions())))
eq(getPartitionIdFromParams(thePartitioned))))
.thenReturn(resultIterator);
// test
@ -211,6 +214,14 @@ public class JpaBulkExportProcessorTest {
assertFalse(pidIterator.hasNext());
}
private RequestPartitionId getPartitionIdFromParams(boolean thePartitioned) {
if (thePartitioned) {
return RequestPartitionId.fromPartitionName("Partition-A");
} else {
return RequestPartitionId.allPartitions();
}
}
@Test
public void getResourcePidIterator_patientStyleWithIndexMissingFieldsDisabled_throws() {
// setup
@ -231,32 +242,33 @@ public class JpaBulkExportProcessorTest {
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void getResourcePidIterator_groupExportStyleWithPatientResource_returnsIterator(boolean theMdm) {
@CsvSource({"false, false", "false, true", "true, true", "true, false"})
public void getResourcePidIterator_groupExportStyleWithPatientResource_returnsIterator(boolean theMdm, boolean thePartitioned) {
// setup
ExportPIDIteratorParameters parameters = createExportParameters(BulkDataExportOptions.ExportStyle.GROUP);
parameters.setResourceType("Patient");
JpaPid groupId = JpaPid.fromId(Long.parseLong(parameters.getGroupId()));
long groupGoldenPid = 4567l;
long groupGoldenPid = 4567;
Group groupResource = new Group();
groupResource.setId(parameters.getGroupId());
List<IPrimitiveType> patientTypes = createPatientTypes();
List<IResourcePersistentId> pids = new ArrayList<>();
for (IPrimitiveType type : patientTypes) {
pids.add(JpaPid.fromId(((IdDt) type).getIdPartAsLong()));
List<IIdType> patientTypes = createPatientTypes();
List<JpaPid> pids = new ArrayList<>();
for (IIdType type : patientTypes) {
pids.add(JpaPid.fromId(type.getIdPartAsLong()));
}
MdmPidTuple tuple = createTuple(groupId.getId(), groupGoldenPid);
MdmPidTuple<JpaPid> tuple = createTuple(groupId.getId(), groupGoldenPid);
IFhirResourceDao<Group> groupDao = mock(IFhirResourceDao.class);
parameters.setExpandMdm(theMdm); // set mdm expansion
parameters.setPartitionId(getPartitionIdFromParams(thePartitioned));
// extra mocks
IFhirResourceDao<?> mockDao = mock(IFhirResourceDao.class);
ISearchBuilder searchBuilder = mock(ISearchBuilder.class);
ISearchBuilder<JpaPid> searchBuilder = mock(ISearchBuilder.class);
// from getMembersFromGroupWithFilter
when(myBulkExportHelperService.createSearchParameterMapsForResourceType(any(RuntimeResourceDefinition.class), eq(parameters), any(boolean.class)))
@ -271,7 +283,7 @@ public class JpaBulkExportProcessorTest {
any(SearchParameterMap.class),
any(SearchRuntimeDetails.class),
any(),
eq(RequestPartitionId.allPartitions())))
eq(getPartitionIdFromParams(thePartitioned))))
.thenReturn(new ListResultIterator(pids));
// mdm expansion stuff
@ -282,16 +294,16 @@ public class JpaBulkExportProcessorTest {
.thenReturn(groupResource);
when(myIdHelperService.translatePidsToForcedIds(any(Set.class)))
.thenAnswer(params -> {
Set<IResourcePersistentId> uniqPids = params.getArgument(0);
HashMap<IResourcePersistentId, Optional<String>> answer = new HashMap<>();
for (IResourcePersistentId l : uniqPids) {
Set<JpaPid> uniqPids = params.getArgument(0);
HashMap<JpaPid, Optional<String>> answer = new HashMap<>();
for (JpaPid l : uniqPids) {
answer.put(l, Optional.empty());
}
return new PersistentIdToForcedIdMap(answer);
return new PersistentIdToForcedIdMap<>(answer);
});
when(myIdHelperService.getPidOrNull(any(), any(Group.class)))
when(myIdHelperService.getPidOrNull(eq(getPartitionIdFromParams(thePartitioned)), any(Group.class)))
.thenReturn(groupId);
when(myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(any(BaseResourcePersistentId.class), eq(MdmMatchResultEnum.MATCH)))
when(myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(any(JpaPid.class), eq(MdmMatchResultEnum.MATCH)))
.thenReturn(Collections.singletonList(tuple));
when(myMdmExpansionCacheSvc.hasBeenPopulated())
.thenReturn(false); // does not matter, since if false, it then goes and populates
@ -317,11 +329,27 @@ public class JpaBulkExportProcessorTest {
}
int total = pids.size();
assertEquals(total, count);
if (theMdm) {
ArgumentCaptor<SystemRequestDetails> requestDetailsCaptor = ArgumentCaptor.forClass(SystemRequestDetails.class);
verify(groupDao).read(eq(new IdDt(parameters.getGroupId())), requestDetailsCaptor.capture());
validatePartitionId(thePartitioned, requestDetailsCaptor.getValue().getRequestPartitionId());
}
}
private void validatePartitionId(boolean thePartitioned, RequestPartitionId thePartitionId) {
if (thePartitioned) {
assertNotNull(thePartitionId.getPartitionNames());
assertEquals("Partition-A", thePartitionId.getPartitionNames().get(0));
} else {
assertEquals(RequestPartitionId.allPartitions(), thePartitionId);
}
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void getResourcePidIterator_groupExportNonPatient_returnsIterator(boolean theMdm) {
@CsvSource({"false, false", "false, true", "true, true", "true, false"})
public void getResourcePidIterator_groupExportStyleWithNonPatientResource_returnsIterator(boolean theMdm, boolean thePartitioned) {
// setup
ExportPIDIteratorParameters parameters = createExportParameters(BulkDataExportOptions.ExportStyle.GROUP);
parameters.setResourceType("Observation");
@ -329,59 +357,89 @@ public class JpaBulkExportProcessorTest {
JpaPid groupId = JpaPid.fromId(Long.parseLong(parameters.getGroupId()));
Group groupResource = new Group();
groupResource.setId(parameters.getGroupId());
long groupGoldenPid = 4567l;
long groupGoldenPid = 4567;
JpaPid pid = JpaPid.fromId(123L);
JpaPid pid2 = JpaPid.fromId(456L);
ListResultIterator resultIterator = new ListResultIterator(
Arrays.asList(pid, pid2)
JpaPid patientPid = JpaPid.fromId(123L);
JpaPid patientPid2 = JpaPid.fromId(456L);
ListResultIterator patientResultsIterator = new ListResultIterator(
Arrays.asList(patientPid, patientPid2)
);
MdmPidTuple tuple = createTuple(groupId.getId(), groupGoldenPid);
List<IPrimitiveType> patientTypes = createPatientTypes();
JpaPid observationPid = JpaPid.fromId(234L);
JpaPid observationPid2 = JpaPid.fromId(567L);
ListResultIterator observationResultsIterator = new ListResultIterator(
Arrays.asList(observationPid, observationPid2)
);
HashSet<JpaPid> observationPidSet = new HashSet<>();
observationPidSet.add(observationPid);
observationPidSet.add(observationPid2);
MdmPidTuple<JpaPid> tuple = createTuple(groupId.getId(), groupGoldenPid);
IFhirResourceDao<Patient> patientDao = mock(IFhirResourceDao.class);
IFhirResourceDao<Group> groupDao = mock(IFhirResourceDao.class);
IFhirResourceDao<Observation> observationDao = mock(IFhirResourceDao.class);
parameters.setExpandMdm(theMdm); // set mdm expansion
parameters.setPartitionId(getPartitionIdFromParams(thePartitioned));
// extra mocks
IFhirResourceDao<?> mockDao = mock(IFhirResourceDao.class);
ISearchBuilder searchBuilder = mock(ISearchBuilder.class);
ISearchBuilder<JpaPid> patientSearchBuilder = mock(ISearchBuilder.class);
ISearchBuilder<JpaPid> observationSearchBuilder = mock(ISearchBuilder.class);
// when
// expandAllPatientPidsFromGroup
when(myDaoRegistry.getResourceDao(eq("Group")))
.thenReturn(groupDao);
when(groupDao.read(any(IIdType.class), any(SystemRequestDetails.class)))
.thenReturn(groupResource);
when(myIdHelperService.getPidOrNull(any(), eq(groupResource)))
when(myIdHelperService.getPidOrNull(eq(getPartitionIdFromParams(thePartitioned)), eq(groupResource)))
.thenReturn(groupId);
when(myBulkExportHelperService.createSearchParameterMapsForResourceType(any(RuntimeResourceDefinition.class), eq(parameters), any(boolean.class)))
.thenReturn(Collections.singletonList(new SearchParameterMap()));
when(myDaoRegistry.getResourceDao(not(eq("Group"))))
.thenReturn(mockDao);
when(mySearchBuilderFactory.newSearchBuilder(eq(mockDao), not(eq("Group")), any()))
.thenReturn(searchBuilder);
// getMembersFromGroupWithFilter
when(myDaoRegistry.getResourceDao(eq("Patient")))
.thenReturn(patientDao);
when(mySearchBuilderFactory.newSearchBuilder(eq(patientDao), eq("Patient"), eq(Patient.class)))
.thenReturn(patientSearchBuilder);
RuntimeResourceDefinition patientDef = myFhirContext.getResourceDefinition("Patient");
SearchParameterMap patientSpMap = new SearchParameterMap();
when(myBulkExportHelperService.createSearchParameterMapsForResourceType(eq(patientDef), eq(parameters), any(boolean.class)))
.thenReturn(Collections.singletonList(patientSpMap));
when(patientSearchBuilder.createQuery(eq(patientSpMap), any(), any(), eq(getPartitionIdFromParams(thePartitioned))))
.thenReturn(patientResultsIterator);
// queryResourceTypeWithReferencesToPatients
SearchParameterMap observationSpMap = new SearchParameterMap();
RuntimeResourceDefinition observationDef = myFhirContext.getResourceDefinition("Observation");
when(myBulkExportHelperService.createSearchParameterMapsForResourceType(eq(observationDef), eq(parameters), any(boolean.class)))
.thenReturn(Collections.singletonList(observationSpMap));
when(myDaoRegistry.getResourceDao((eq("Observation"))))
.thenReturn(observationDao);
when(mySearchBuilderFactory.newSearchBuilder(eq(observationDao), eq("Observation"), eq(Observation.class)))
.thenReturn(observationSearchBuilder);
when(observationSearchBuilder.loadIncludes(any(), any(), eq(observationPidSet), any(), eq(false), any(), any(),
any(SystemRequestDetails.class), any()))
.thenReturn(new HashSet<>());
// ret
when(searchBuilder.createQuery(
any(SearchParameterMap.class),
when(observationSearchBuilder.createQuery(
eq(observationSpMap),
any(SearchRuntimeDetails.class),
any(),
eq(RequestPartitionId.allPartitions())))
.thenReturn(new ListResultIterator(Collections.singletonList(pid)))
.thenReturn(resultIterator);
eq(getPartitionIdFromParams(thePartitioned))))
.thenReturn(observationResultsIterator);
if (theMdm) {
when(myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(any(BaseResourcePersistentId.class), eq(MdmMatchResultEnum.MATCH)))
when(myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(any(JpaPid.class), eq(MdmMatchResultEnum.MATCH)))
.thenReturn(Collections.singletonList(tuple));
when(myIdHelperService.translatePidsToForcedIds(any(Set.class)))
.thenAnswer(params -> {
Set<IResourcePersistentId> uniqPids = params.getArgument(0);
HashMap<IResourcePersistentId, Optional<String>> answer = new HashMap<>();
for (IResourcePersistentId l : uniqPids) {
Set<JpaPid> uniqPids = params.getArgument(0);
HashMap<JpaPid, Optional<String>> answer = new HashMap<>();
for (JpaPid l : uniqPids) {
answer.put(l, Optional.empty());
}
return new PersistentIdToForcedIdMap(answer);
return new PersistentIdToForcedIdMap<>(answer);
});
}
@ -391,14 +449,24 @@ public class JpaBulkExportProcessorTest {
// verify
assertNotNull(pidIterator, "PID iterator null for mdm = " + theMdm);
assertTrue(pidIterator.hasNext(), "PID iterator empty for mdm = " + theMdm);
ArgumentCaptor<SystemRequestDetails> groupDaoReadSystemRequestDetailsCaptor = ArgumentCaptor.forClass(SystemRequestDetails.class);
verify(groupDao).read(any(IIdType.class), groupDaoReadSystemRequestDetailsCaptor.capture());
validatePartitionId(thePartitioned, groupDaoReadSystemRequestDetailsCaptor.getValue().getRequestPartitionId());
ArgumentCaptor<SystemRequestDetails> searchBuilderLoadIncludesRequestDetailsCaptor = ArgumentCaptor.forClass(SystemRequestDetails.class);
verify(observationSearchBuilder).loadIncludes(any(), any(), eq(observationPidSet), any(), eq(false), any(), any(),
searchBuilderLoadIncludesRequestDetailsCaptor.capture(), any());
validatePartitionId(thePartitioned, searchBuilderLoadIncludesRequestDetailsCaptor.getValue().getRequestPartitionId());
}
@Test
public void getResourcePidIterator_systemExport_returnsIterator() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void getResourcePidIterator_systemExport_returnsIterator(boolean thePartitioned) {
// setup
ExportPIDIteratorParameters parameters = createExportParameters(BulkDataExportOptions.ExportStyle.SYSTEM);
parameters.setResourceType("Patient");
parameters.setPartitionId(getPartitionIdFromParams(thePartitioned));
JpaPid pid = JpaPid.fromId(123L);
JpaPid pid2 = JpaPid.fromId(456L);
ListResultIterator resultIterator = new ListResultIterator(
@ -407,7 +475,7 @@ public class JpaBulkExportProcessorTest {
// extra mocks
IFhirResourceDao<Patient> dao = mock(IFhirResourceDao.class);
ISearchBuilder searchBuilder = mock(ISearchBuilder.class);
ISearchBuilder<JpaPid> searchBuilder = mock(ISearchBuilder.class);
// when
when(myBulkExportHelperService.createSearchParameterMapsForResourceType(
@ -426,7 +494,7 @@ public class JpaBulkExportProcessorTest {
any(SearchParameterMap.class),
any(SearchRuntimeDetails.class),
any(),
eq(RequestPartitionId.allPartitions())
eq(getPartitionIdFromParams(thePartitioned))
)).thenReturn(resultIterator);
// test
@ -437,7 +505,7 @@ public class JpaBulkExportProcessorTest {
assertTrue(iterator.hasNext());
int count = 0;
while (iterator.hasNext()) {
IResourcePersistentId ret = iterator.next();
JpaPid ret = iterator.next();
assertTrue(
ret.equals(pid) || ret.equals(pid2)
);
@ -445,4 +513,45 @@ public class JpaBulkExportProcessorTest {
}
assertEquals(2, count);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void getResourcePidIterator_groupExportStyleWithGroupResource_returnsAnIterator(boolean thePartitioned) {
// setup
ExportPIDIteratorParameters parameters = createExportParameters(BulkDataExportOptions.ExportStyle.GROUP);
parameters.setResourceType("Group");
parameters.setPartitionId(getPartitionIdFromParams(thePartitioned));
Long groupId = Long.parseLong(parameters.getGroupId());
JpaPid pid = JpaPid.fromId(groupId);
Group groupResource = new Group();
groupResource.setId(parameters.getGroupId());
// extra mocks
IFhirResourceDao<Group> mockDao = mock(IFhirResourceDao.class);
// when
when(myDaoRegistry.getResourceDao(eq("Group")))
.thenReturn(mockDao);
when(mockDao.read(any(IdDt.class), any(SystemRequestDetails.class)))
.thenReturn(groupResource);
// ret
when(myIdHelperService.getPidOrNull(eq(getPartitionIdFromParams(thePartitioned)), eq(groupResource)))
.thenReturn(pid);
// test
Iterator<JpaPid> pidIterator = myProcessor.getResourcePidIterator(parameters);
// verify
assertNotNull(pidIterator);
assertTrue(pidIterator.hasNext());
assertEquals(pid, pidIterator.next());
assertFalse(pidIterator.hasNext());
ArgumentCaptor<SystemRequestDetails> resourceDaoServletRequestDetailsCaptor = ArgumentCaptor.forClass(SystemRequestDetails.class);
verify(mockDao).read(any(IdDt.class), resourceDaoServletRequestDetailsCaptor.capture());
validatePartitionId(thePartitioned, resourceDaoServletRequestDetailsCaptor.getValue().getRequestPartitionId());
}
}

View File

@ -1,6 +1,8 @@
package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.model.Batch2JobInfo;
@ -14,10 +16,14 @@ import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.partition.RequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.client.apache.ResourceEntity;
import ca.uhn.fhir.rest.server.HardcodedServerAddressStrategy;
import ca.uhn.fhir.rest.server.exceptions.ForbiddenOperationException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.tenant.UrlBaseTenantIdentificationStrategy;
import ca.uhn.fhir.test.utilities.HttpClientExtension;
import ca.uhn.fhir.test.utilities.server.RestfulServerExtension;
import ca.uhn.fhir.util.JsonUtil;
@ -32,12 +38,14 @@ import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.StringType;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
@ -94,24 +102,56 @@ public class BulkDataExportProviderTest {
@RegisterExtension
private final RestfulServerExtension myServer = new RestfulServerExtension(myCtx)
.withServer(s -> s.registerProvider(myProvider));
@Spy
private RequestPartitionHelperSvc myRequestPartitionHelperSvc = new MyRequestPartitionHelperSvc();
private JpaStorageSettings myStorageSettings;
private DaoRegistry myDaoRegistry;
private final RequestPartitionId myRequestPartitionId = RequestPartitionId.fromPartitionIdAndName(123, "Partition-A");
private final String myPartitionName = "Partition-A";
private final String myFixedBaseUrl = "http:/myfixedbaseurl.com";
private class MyRequestPartitionHelperSvc extends RequestPartitionHelperSvc {
@Override
public @NotNull RequestPartitionId determineReadPartitionForRequest(RequestDetails theRequest, ReadPartitionIdRequestDetails theDetails) {
assert theRequest != null;
if (myPartitionName.equals(theRequest.getTenantId())) {
return myRequestPartitionId;
} else {
return RequestPartitionId.fromPartitionName(theRequest.getTenantId());
}
}
@Override
public void validateHasPartitionPermissions(RequestDetails theRequest, String theResourceType, RequestPartitionId theRequestPartitionId) {
if (!myPartitionName.equals(theRequest.getTenantId()) && theRequest.getTenantId() != null) {
throw new ForbiddenOperationException("User does not have access to resources on the requested partition");
}
}
}
@BeforeEach
public void injectStorageSettings() {
myStorageSettings = new JpaStorageSettings();
myProvider.setStorageSettings(myStorageSettings);
myDaoRegistry = mock(DaoRegistry.class);
lenient().when(myDaoRegistry.getRegisteredDaoTypes()).thenReturn(Set.of("Patient", "Observation", "Encounter"));
myProvider.setDaoRegistry(myDaoRegistry);
DaoRegistry daoRegistry = mock(DaoRegistry.class);
lenient().when(daoRegistry.getRegisteredDaoTypes()).thenReturn(Set.of("Patient", "Observation", "Encounter"));
myProvider.setDaoRegistry(daoRegistry);
}
public void startWithFixedBaseUrl() {
String baseUrl = myServer.getBaseUrl() + "/fixedvalue";
HardcodedServerAddressStrategy hardcodedServerAddressStrategy = new HardcodedServerAddressStrategy(baseUrl);
HardcodedServerAddressStrategy hardcodedServerAddressStrategy = new HardcodedServerAddressStrategy(myFixedBaseUrl);
myServer.withServer(s -> s.setServerAddressStrategy(hardcodedServerAddressStrategy));
}
public void enablePartitioning() {
myServer.getRestfulServer().setTenantIdentificationStrategy(new UrlBaseTenantIdentificationStrategy());
}
private BulkExportParameters verifyJobStart() {
ArgumentCaptor<Batch2BaseJobParameters> startJobCaptor = ArgumentCaptor.forClass(Batch2BaseJobParameters.class);
verify(myJobRunner).startNewJob(startJobCaptor.capture());
@ -132,13 +172,21 @@ public class BulkDataExportProviderTest {
}
@ParameterizedTest
@MethodSource("paramsProvider")
public void testSuccessfulInitiateBulkRequest_Post_WithFixedBaseURL(Boolean baseUrlFixed) throws IOException {
@CsvSource({"false, false", "false, true", "true, true", "true, false"})
public void testSuccessfulInitiateBulkRequest_Post_WithFixedBaseURLAndPartitioning(Boolean baseUrlFixed, Boolean partitioningEnabled) throws IOException {
// setup
if (baseUrlFixed) {
startWithFixedBaseUrl();
}
String myBaseUriForExport;
if (partitioningEnabled) {
enablePartitioning();
myBaseUriForExport = myServer.getBaseUrl() + "/" + myPartitionName;
} else {
myBaseUriForExport = myServer.getBaseUrl();
}
String patientResource = "Patient";
String practitionerResource = "Practitioner";
String filter = "Patient?identifier=foo";
@ -156,16 +204,29 @@ public class BulkDataExportProviderTest {
ourLog.debug(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
// test
HttpPost post = new HttpPost(myServer.getBaseUrl() + "/" + JpaConstants.OPERATION_EXPORT);
HttpPost post = new HttpPost(myBaseUriForExport + "/" + JpaConstants.OPERATION_EXPORT);
post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
post.setEntity(new ResourceEntity(myCtx, input));
ourLog.info("Request: {}", post);
try (CloseableHttpResponse response = myClient.execute(post)) {
ourLog.info("Response: {}", response.toString());
String baseUrl;
if (baseUrlFixed) {
// If a fixed Base URL is assigned, then the URLs in the poll response should similarly start with the fixed base URL.
baseUrl = myFixedBaseUrl;
} else {
// Otherwise the URLs in the poll response should start with the default server URL.
baseUrl = myServer.getBaseUrl();
}
if(partitioningEnabled) {
baseUrl = baseUrl + "/" + myPartitionName;
}
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals(myServer.getBaseUrl() + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
assertEquals(baseUrl + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
BulkExportParameters params = verifyJobStart();
@ -197,13 +258,21 @@ public class BulkDataExportProviderTest {
}
@Test
public void testSuccessfulInitiateBulkRequest_Get() throws IOException {
@ParameterizedTest
@MethodSource("paramsProvider")
public void testSuccessfulInitiateBulkRequest_GetWithPartitioning(boolean partitioningEnabled) throws IOException {
when(myJobRunner.startNewJob(any())).thenReturn(createJobStartResponse());
InstantType now = InstantType.now();
String url = myServer.getBaseUrl() + "/" + JpaConstants.OPERATION_EXPORT
String myBaseUrl;
if (partitioningEnabled) {
enablePartitioning();
myBaseUrl = myServer.getBaseUrl() + "/" + myPartitionName;
} else {
myBaseUrl = myServer.getBaseUrl();
}
String url = myBaseUrl + "/" + JpaConstants.OPERATION_EXPORT
+ "?" + JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT + "=" + UrlUtil.escapeUrlParam(Constants.CT_FHIR_NDJSON)
+ "&" + JpaConstants.PARAM_EXPORT_TYPE + "=" + UrlUtil.escapeUrlParam("Patient, Practitioner")
+ "&" + JpaConstants.PARAM_EXPORT_SINCE + "=" + UrlUtil.escapeUrlParam(now.getValueAsString())
@ -217,7 +286,7 @@ public class BulkDataExportProviderTest {
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals(myServer.getBaseUrl() + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
assertEquals(myBaseUrl + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
BulkExportParameters params = verifyJobStart();
@ -315,13 +384,22 @@ public class BulkDataExportProviderTest {
}
@ParameterizedTest
@MethodSource("paramsProvider")
public void testPollForStatus_COMPLETED_WithFixedBaseURL(boolean baseUrlFixed) throws IOException {
@CsvSource({"false, false", "false, true", "true, true", "true, false"})
public void testPollForStatus_COMPLETED_WithFixedBaseURLAndPartitioning(boolean baseUrlFixed, boolean partitioningEnabled) throws IOException {
// setup
if (baseUrlFixed) {
startWithFixedBaseUrl();
}
String myBaseUriForExport;
if (partitioningEnabled) {
enablePartitioning();
myBaseUriForExport = myServer.getBaseUrl() + "/" + myPartitionName;
} else {
myBaseUriForExport = myServer.getBaseUrl();
}
Batch2JobInfo info = new Batch2JobInfo();
info.setJobId(A_JOB_ID);
info.setStatus(BulkExportJobStatusEnum.COMPLETE);
@ -336,19 +414,35 @@ public class BulkDataExportProviderTest {
map.put("Patient", ids);
results.setResourceTypeToBinaryIds(map);
info.setReport(JsonUtil.serialize(results));
if (partitioningEnabled) {
info.setRequestPartitionId(myRequestPartitionId);
}
// when
when(myJobRunner.getJobInfo(eq(A_JOB_ID)))
.thenReturn(info);
// call
String url = myServer.getBaseUrl() + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
String url = myBaseUriForExport + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
try (CloseableHttpResponse response = myClient.execute(get)) {
ourLog.info("Response: {}", response.toString());
String myBaseUriForPoll;
if (baseUrlFixed) {
// If a fixed Base URL is provided, the URLs in the poll response should similarly start with the fixed Base URL.
myBaseUriForPoll = myFixedBaseUrl;
} else {
// Otherwise the URLs in the poll response should instead with the default server URL.
myBaseUriForPoll = myServer.getBaseUrl();
}
if (partitioningEnabled) {
// If partitioning is enabled, then the URLs in the poll response should also have the partition name.
myBaseUriForPoll = myBaseUriForPoll + "/"+ myPartitionName;
}
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals("OK", response.getStatusLine().getReasonPhrase());
assertEquals(Constants.CT_JSON, response.getEntity().getContentType().getValue());
@ -358,11 +452,51 @@ public class BulkDataExportProviderTest {
BulkExportResponseJson responseJson = JsonUtil.deserialize(responseContent, BulkExportResponseJson.class);
assertEquals(3, responseJson.getOutput().size());
assertEquals("Patient", responseJson.getOutput().get(0).getType());
assertEquals(myServer.getBaseUrl() + "/Binary/111", responseJson.getOutput().get(0).getUrl());
assertEquals(myBaseUriForPoll + "/Binary/111", responseJson.getOutput().get(0).getUrl());
assertEquals("Patient", responseJson.getOutput().get(1).getType());
assertEquals(myServer.getBaseUrl() + "/Binary/222", responseJson.getOutput().get(1).getUrl());
assertEquals(myBaseUriForPoll + "/Binary/222", responseJson.getOutput().get(1).getUrl());
assertEquals("Patient", responseJson.getOutput().get(2).getType());
assertEquals(myServer.getBaseUrl() + "/Binary/333", responseJson.getOutput().get(2).getUrl());
assertEquals(myBaseUriForPoll + "/Binary/333", responseJson.getOutput().get(2).getUrl());
}
}
@Test
public void testPollForStatus_WithInvalidPartition() throws IOException {
// setup
enablePartitioning();
Batch2JobInfo info = new Batch2JobInfo();
info.setJobId(A_JOB_ID);
info.setStatus(BulkExportJobStatusEnum.COMPLETE);
info.setEndTime(InstantType.now().getValue());
info.setRequestPartitionId(myRequestPartitionId);
ArrayList<String> ids = new ArrayList<>();
ids.add(new IdType("Binary/111").getValueAsString());
ids.add(new IdType("Binary/222").getValueAsString());
ids.add(new IdType("Binary/333").getValueAsString());
BulkExportJobResults results = new BulkExportJobResults();
HashMap<String, List<String>> map = new HashMap<>();
map.put("Patient", ids);
results.setResourceTypeToBinaryIds(map);
info.setReport(JsonUtil.serialize(results));
// when
when(myJobRunner.getJobInfo(eq(A_JOB_ID)))
.thenReturn(info);
// call
String myBaseUriForExport = myServer.getBaseUrl() + "/Partition-B";
String url = myBaseUriForExport + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
try (CloseableHttpResponse response = myClient.execute(get)) {
ourLog.info("Response: {}", response.toString());
assertEquals(403, response.getStatusLine().getStatusCode());
assertEquals("Forbidden", response.getStatusLine().getReasonPhrase());
}
}
@ -771,13 +905,17 @@ public class BulkDataExportProviderTest {
}
@Test
public void testDeleteForOperationPollStatus_SUBMITTED_ShouldCancelJobSuccessfully() throws IOException {
@ParameterizedTest
@MethodSource("paramsProvider")
public void testDeleteForOperationPollStatus_SUBMITTED_ShouldCancelJobSuccessfully(boolean partitioningEnabled) throws IOException {
// setup
Batch2JobInfo info = new Batch2JobInfo();
info.setJobId(A_JOB_ID);
info.setStatus(BulkExportJobStatusEnum.SUBMITTED);
info.setEndTime(InstantType.now().getValue());
if (partitioningEnabled) {
info.setRequestPartitionId(myRequestPartitionId);
}
Batch2JobOperationResult result = new Batch2JobOperationResult();
result.setOperation("Cancel job instance " + A_JOB_ID);
result.setMessage("Job instance <" + A_JOB_ID + "> successfully cancelled.");
@ -790,7 +928,15 @@ public class BulkDataExportProviderTest {
.thenReturn(result);
// call
String url = myServer.getBaseUrl() + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
String baseUrl;
if (partitioningEnabled) {
enablePartitioning();
baseUrl = myServer.getBaseUrl() + "/" + myPartitionName;
} else {
baseUrl = myServer.getBaseUrl();
}
String url = baseUrl + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
HttpDelete delete = new HttpDelete(url);
try (CloseableHttpResponse response = myClient.execute(delete)) {
@ -903,13 +1049,17 @@ public class BulkDataExportProviderTest {
}
}
@Test
public void testOperationExportPollStatus_POST_ExistingId_Accepted() throws IOException {
@ParameterizedTest
@MethodSource("paramsProvider")
public void testOperationExportPollStatus_POST_ExistingId_Accepted(boolean partititioningEnabled) throws IOException {
// setup
Batch2JobInfo info = new Batch2JobInfo();
info.setJobId(A_JOB_ID);
info.setStatus(BulkExportJobStatusEnum.SUBMITTED);
info.setEndTime(InstantType.now().getValue());
if(partititioningEnabled) {
info.setRequestPartitionId(myRequestPartitionId);
}
// when
when(myJobRunner.getJobInfo(eq(A_JOB_ID)))
@ -920,8 +1070,16 @@ public class BulkDataExportProviderTest {
input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(ca.uhn.fhir.rest.api.Constants.CT_FHIR_NDJSON));
input.addParameter(JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID, new StringType(A_JOB_ID));
String baseUrl;
if (partititioningEnabled) {
enablePartitioning();
baseUrl = myServer.getBaseUrl() + "/" + myPartitionName;
} else {
baseUrl = myServer.getBaseUrl();
}
// Initiate Export Poll Status
HttpPost post = new HttpPost(myServer.getBaseUrl() + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS);
HttpPost post = new HttpPost(baseUrl + "/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS);
post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
post.setEntity(new ResourceEntity(myCtx, input));
@ -964,6 +1122,56 @@ public class BulkDataExportProviderTest {
}
}
@Test
public void testFailBulkExportRequest_PartitionedWithoutPermissions() throws IOException {
// setup
enablePartitioning();
// test
String url = myServer.getBaseUrl() + "/Partition-B/" + JpaConstants.OPERATION_EXPORT
+ "?" + JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT + "=" + UrlUtil.escapeUrlParam(Constants.CT_FHIR_NDJSON)
+ "&" + JpaConstants.PARAM_EXPORT_TYPE + "=" + UrlUtil.escapeUrlParam("Patient, Practitioner");
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
ourLog.info("Request: {}", url);
try (CloseableHttpResponse response = myClient.execute(get)) {
ourLog.info("Response: {}", response.toString());
assertEquals(403, response.getStatusLine().getStatusCode());
assertEquals("Forbidden", response.getStatusLine().getReasonPhrase());
}
}
@Test
public void testFailPollRequest_PartitionedWithoutPermissions() throws IOException {
// setup
enablePartitioning();
Batch2JobInfo info = new Batch2JobInfo();
info.setJobId(A_JOB_ID);
info.setStatus(BulkExportJobStatusEnum.BUILDING);
info.setEndTime(new Date());
info.setRequestPartitionId(myRequestPartitionId);
// when
when(myJobRunner.getJobInfo(eq(A_JOB_ID)))
.thenReturn(info);
// test
String url = myServer.getBaseUrl() + "/Partition-B/" + JpaConstants.OPERATION_EXPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_EXPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
try (CloseableHttpResponse response = myClient.execute(get)) {
ourLog.info("Response: {}", response.toString());
assertEquals(403, response.getStatusLine().getStatusCode());
assertEquals("Forbidden", response.getStatusLine().getReasonPhrase());
}
}
static Stream<Arguments> paramsProvider() {
return Stream.of(
Arguments.arguments(true),

View File

@ -1,11 +1,12 @@
package ca.uhn.fhir.jpa.provider.r4;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.model.Batch2JobInfo;
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
@ -13,6 +14,7 @@ import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.partition.RequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.RequestDetails;
@ -44,7 +46,7 @@ import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
import org.mockito.Spy;
import org.springframework.mock.web.MockHttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -67,7 +69,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@ -75,9 +76,6 @@ import static org.mockito.Mockito.when;
@SuppressWarnings("Duplicates")
public class MultitenantServerR4Test extends BaseMultitenantResourceProviderR4Test implements ITestDataBuilder {
@Autowired
private IBulkDataExportJobSchedulingHelper myBulkDataExportJobSchedulingHelper;
@Override
@AfterEach
public void after() throws Exception {
@ -624,6 +622,25 @@ public class MultitenantServerR4Test extends BaseMultitenantResourceProviderR4Te
@Mock
private IBatch2JobRunner myJobRunner;
@Spy
private RequestPartitionHelperSvc myRequestPartitionHelperSvc = new MultitenantServerR4Test.PartitionTesting.MyRequestPartitionHelperSvc();
String myTenantName = null;
private class MyRequestPartitionHelperSvc extends RequestPartitionHelperSvc {
@Override
public RequestPartitionId determineReadPartitionForRequest(RequestDetails theRequest, ReadPartitionIdRequestDetails theDetails) {
return RequestPartitionId.fromPartitionName(myTenantName);
}
@Override
public void validateHasPartitionPermissions(RequestDetails theRequest, String theResourceType, RequestPartitionId theRequestPartitionId) {
return;
}
}
@Test
public void testBulkExportForDifferentPartitions() throws IOException {
setBulkDataExportProvider();
@ -666,8 +683,6 @@ public class MultitenantServerR4Test extends BaseMultitenantResourceProviderR4Te
reqDetails.addHeader(Constants.HEADER_PREFER,
"respond-async");
servletRequestDetails.setServletRequest(reqDetails);
doReturn(JpaConstants.DEFAULT_PARTITION_NAME + "/")
.when(servletRequestDetails).getServerBaseForRequest();
when(servletRequestDetails.getServer())
.thenReturn(mockServer);
when(servletRequestDetails.getServletResponse())
@ -681,11 +696,12 @@ public class MultitenantServerR4Test extends BaseMultitenantResourceProviderR4Te
}
//perform export-poll-status
myTenantName = createInPartition;
HttpGet get = new HttpGet(buildExportUrl(createInPartition, jobId));
try (CloseableHttpResponse response = ourHttpClient.execute(get)) {
String responseString = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
BulkExportResponseJson responseJson = JsonUtil.deserialize(responseString, BulkExportResponseJson.class);
assertThat(responseJson.getOutput().get(0).getUrl(), containsString(JpaConstants.DEFAULT_PARTITION_NAME + "/Binary/"));
assertThat(responseJson.getOutput().get(0).getUrl(), containsString(createInPartition + "/Binary/"));
}
}

View File

@ -28,6 +28,7 @@ import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.PersistentIdToForcedIdMap;
@ -69,7 +70,7 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
private FhirContext myFhirContext;
@Autowired
private IBulkExportProcessor myBulkExportProcessor;
private IBulkExportProcessor<?> myBulkExportProcessor;
@Autowired
private ApplicationContext myApplicationContext;
@ -96,7 +97,7 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
ourLog.info("About to expand {} resource IDs into their full resource bodies.", idList.getIds().size());
// search the resources
List<IBaseResource> allResources = fetchAllResources(idList);
List<IBaseResource> allResources = fetchAllResources(idList, jobParameters.getPartitionId());
// if necessary, expand resources
@ -136,7 +137,7 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
return RunOutcome.SUCCESS;
}
private List<IBaseResource> fetchAllResources(ResourceIdList theIds) {
private List<IBaseResource> fetchAllResources(ResourceIdList theIds, RequestPartitionId theRequestPartitionId) {
ArrayListMultimap<String, String> typeToIds = ArrayListMultimap.create();
theIds.getIds().forEach(t -> typeToIds.put(t.getResourceType(), t.getId()));
@ -173,7 +174,7 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
SearchParameterMap spMap = SearchParameterMap
.newSynchronous()
.add(PARAM_ID, idListParam);
IBundleProvider outcome = dao.search(spMap, new SystemRequestDetails());
IBundleProvider outcome = dao.search(spMap, new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId));
resources.addAll(outcome.getAllResources());
}

View File

@ -69,6 +69,7 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
providerParams.setGroupId(params.getGroupId());
providerParams.setPatientIds(params.getPatientIds());
providerParams.setExpandMdm(params.isExpandMdm());
providerParams.setPartitionId(params.getPartitionId());
int submissionCount = 0;
try {

View File

@ -100,8 +100,14 @@ public class WriteBinaryStep implements IJobStepWorker<BulkExportJobParameters,
throw new JobExecutionFailedException(Msg.code(2238) + errorMsg);
}
DaoMethodOutcome outcome = binaryDao.create(binary,
new SystemRequestDetails().setRequestPartitionId(RequestPartitionId.defaultPartition()));
SystemRequestDetails srd = new SystemRequestDetails();
RequestPartitionId partitionId = theStepExecutionDetails.getParameters().getPartitionId();
if (partitionId == null){
srd.setRequestPartitionId(RequestPartitionId.defaultPartition());
} else {
srd.setRequestPartitionId(partitionId);
}
DaoMethodOutcome outcome = binaryDao.create(binary,srd);
IIdType id = outcome.getId();
BulkExportBinaryFileId bulkExportBinaryFileId = new BulkExportBinaryFileId();

View File

@ -19,6 +19,7 @@
*/
package ca.uhn.fhir.batch2.jobs.export.models;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.model.BulkExportParameters;
import ca.uhn.fhir.jpa.util.JsonDateDeserializer;
import ca.uhn.fhir.jpa.util.JsonDateSerializer;
@ -70,6 +71,9 @@ public class BulkExportJobParameters extends BulkExportJobBase {
@JsonProperty("expandMdm")
private boolean myExpandMdm;
@JsonProperty("partitionId")
private RequestPartitionId myPartitionId;
public List<String> getResourceTypes() {
return myResourceTypes;
}
@ -142,6 +146,14 @@ public class BulkExportJobParameters extends BulkExportJobBase {
return myOriginalRequestUrl;
}
public void setPartitionId(RequestPartitionId thePartitionId) {
this.myPartitionId = thePartitionId;
}
public RequestPartitionId getPartitionId() {
return myPartitionId;
}
public static BulkExportJobParameters createFromExportJobParameters(BulkExportParameters theParameters) {
BulkExportJobParameters params = new BulkExportJobParameters();
params.setResourceTypes(theParameters.getResourceTypes());
@ -153,6 +165,7 @@ public class BulkExportJobParameters extends BulkExportJobBase {
params.setExpandMdm(theParameters.isExpandMdm());
params.setPatientIds(theParameters.getPatientIds());
params.setOriginalRequestUrl(theParameters.getOriginalRequestUrl());
params.setPartitionId(theParameters.getPartitionId());
return params;
}

View File

@ -104,6 +104,11 @@ public class Batch2JobRunnerImpl implements IBatch2JobRunner {
info.setEndTime(theInstance.getEndTime());
info.setReport(theInstance.getReport());
info.setErrorMsg(theInstance.getErrorMessage());
if ( Batch2JobDefinitionConstants.BULK_EXPORT.equals(theInstance.getJobDefinitionId())) {
BulkExportJobParameters parameters = theInstance.getParameters(BulkExportJobParameters.class);
info.setRequestPartitionId(parameters.getPartitionId());
}
return info;
}

View File

@ -10,6 +10,7 @@ import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.jobs.models.BatchResourceId;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.PersistentIdToForcedIdMap;
@ -19,15 +20,16 @@ import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.rest.api.server.storage.BaseResourcePersistentId;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.interceptor.ResponseTerminologyTranslationSvc;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@ -54,7 +56,7 @@ import static org.mockito.Mockito.when;
public class ExpandResourcesStepTest {
@Mock
private IBulkExportProcessor myProcessor;
private IBulkExportProcessor<?> myProcessor;
@Mock
private DaoRegistry myDaoRegistry;
@ -63,7 +65,7 @@ public class ExpandResourcesStepTest {
private ResponseTerminologyTranslationSvc myResponseTerminologyTranslationSvc;
@Mock
IIdHelperService myIdHelperService;
IIdHelperService<JpaPid> myIdHelperService;
@Spy
private FhirContext myFhirContext = FhirContext.forR4Cached();
@ -77,36 +79,39 @@ public class ExpandResourcesStepTest {
@InjectMocks
private ExpandResourcesStep mySecondStep;
private BulkExportJobParameters createParameters() {
private BulkExportJobParameters createParameters(boolean thePartitioned) {
BulkExportJobParameters parameters = new BulkExportJobParameters();
parameters.setResourceTypes(Arrays.asList("Patient", "Observation"));
parameters.setExportStyle(BulkDataExportOptions.ExportStyle.PATIENT);
parameters.setOutputFormat("json");
parameters.setStartDate(new Date());
if (thePartitioned) {
parameters.setPartitionId(RequestPartitionId.fromPartitionName("Partition-A"));
}
return parameters;
}
private StepExecutionDetails<BulkExportJobParameters, ResourceIdList> createInput(ResourceIdList theData,
BulkExportJobParameters theParameters,
JobInstance theInstance) {
StepExecutionDetails<BulkExportJobParameters, ResourceIdList> input = new StepExecutionDetails<>(
return new StepExecutionDetails<>(
theParameters,
theData,
theInstance,
"1"
);
return input;
}
private IFhirResourceDao<?> mockOutDaoRegistry() {
IFhirResourceDao mockDao = mock(IFhirResourceDao.class);
IFhirResourceDao<?> mockDao = mock(IFhirResourceDao.class);
when(myDaoRegistry.getResourceDao(anyString()))
.thenReturn(mockDao);
return mockDao;
}
@Test
public void jobComplete_withBasicParameters_succeeds() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void jobComplete_withBasicParameters_succeeds(boolean thePartitioned) {
//setup
JobInstance instance = new JobInstance();
instance.setInstanceId("1");
@ -131,13 +136,13 @@ public class ExpandResourcesStepTest {
StepExecutionDetails<BulkExportJobParameters, ResourceIdList> input = createInput(
idList,
createParameters(),
createParameters(thePartitioned),
instance
);
when(patientDao.search(any(), any())).thenReturn(new SimpleBundleProvider(resources));
when(myIdHelperService.newPidFromStringIdAndResourceName(anyString(), anyString())).thenReturn(JpaPid.fromId(1L));
when(myIdHelperService.translatePidsToForcedIds(any())).thenAnswer(t->{
Set<IResourcePersistentId<?>> inputSet = t.getArgument(0, Set.class);
Set<IResourcePersistentId<JpaPid>> inputSet = t.getArgument(0, Set.class);
Map<IResourcePersistentId<?>, Optional<String>> map = new HashMap<>();
for (var next : inputSet) {
map.put(next, Optional.empty());
@ -164,5 +169,11 @@ public class ExpandResourcesStepTest {
assertFalse(stringifiedElement.contains("\t"));
assertFalse(stringifiedElement.contains("\n"));
assertFalse(stringifiedElement.contains(" "));
// Patient Search
ArgumentCaptor<SystemRequestDetails> patientSearchCaptor = ArgumentCaptor.forClass(SystemRequestDetails.class);
verify(patientDao).search(any(), patientSearchCaptor.capture());
assertEquals(input.getParameters().getPartitionId(), patientSearchCaptor.getValue().getRequestPartitionId());
}
}

View File

@ -8,12 +8,12 @@ import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.jobs.models.BatchResourceId;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor;
import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
@ -22,6 +22,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@ -52,7 +54,7 @@ public class FetchResourceIdsStepTest {
private ListAppender<ILoggingEvent> myAppender;
@Mock
private IBulkExportProcessor myBulkExportProcessor;
private IBulkExportProcessor<JpaPid> myBulkExportProcessor;
@InjectMocks
private FetchResourceIdsStep myFirstStep;
@ -69,37 +71,42 @@ public class FetchResourceIdsStepTest {
ourLog.detachAppender(myAppender);
}
private BulkExportJobParameters createParameters() {
private BulkExportJobParameters createParameters(boolean thePartitioned) {
BulkExportJobParameters jobParameters = new BulkExportJobParameters();
jobParameters.setStartDate(new Date());
jobParameters.setOutputFormat("json");
jobParameters.setExportStyle(BulkDataExportOptions.ExportStyle.PATIENT);
jobParameters.setResourceTypes(Arrays.asList("Patient", "Observation"));
if (thePartitioned) {
jobParameters.setPartitionId(RequestPartitionId.fromPartitionName("Partition-A"));
} else {
jobParameters.setPartitionId(RequestPartitionId.allPartitions());
}
return jobParameters;
}
private StepExecutionDetails<BulkExportJobParameters, VoidModel> createInput(BulkExportJobParameters theParameters,
JobInstance theInstance) {
StepExecutionDetails<BulkExportJobParameters, VoidModel> input = new StepExecutionDetails<>(
return new StepExecutionDetails<>(
theParameters,
null,
theInstance,
"1"
);
return input;
}
@Test
public void run_withValidInputs_succeeds() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void run_withValidInputs_succeeds(boolean thePartitioned) {
// setup
IJobDataSink<ResourceIdList> sink = mock(IJobDataSink.class);
BulkExportJobParameters parameters = createParameters();
BulkExportJobParameters parameters = createParameters(thePartitioned);
JobInstance instance = new JobInstance();
instance.setInstanceId("1");
StepExecutionDetails<BulkExportJobParameters, VoidModel> input = createInput(parameters, instance);
ourLog.setLevel(Level.INFO);
List<IResourcePersistentId> patientIds = new ArrayList<>();
List<IResourcePersistentId> observationIds = new ArrayList<>();
List<JpaPid> patientIds = new ArrayList<>();
List<JpaPid> observationIds = new ArrayList<>();
{
JpaPid id1 = JpaPid.fromId(123L);
@ -133,9 +140,7 @@ public class FetchResourceIdsStepTest {
List<ResourceIdList> results = resultCaptor.getAllValues();
assertEquals(parameters.getResourceTypes().size(), results.size());
for (int i = 0; i < results.size(); i++) {
ResourceIdList idList = results.get(i);
for (ResourceIdList idList: results) {
String resourceType = idList.getResourceType();
assertTrue(parameters.getResourceTypes().contains(resourceType));
@ -161,6 +166,12 @@ public class FetchResourceIdsStepTest {
+ parameters.getResourceTypes().size()
+ " groups of ids for processing"
));
ArgumentCaptor<ExportPIDIteratorParameters> mapppedParamsCaptor = ArgumentCaptor.forClass(ExportPIDIteratorParameters.class);
verify(myBulkExportProcessor, times(2)).getResourcePidIterator(mapppedParamsCaptor.capture());
List<ExportPIDIteratorParameters> capturedParameters = mapppedParamsCaptor.getAllValues();
assertEquals(parameters.getPartitionId(), capturedParameters.get(0).getPartitionIdOrAllPartitions());
assertEquals(parameters.getPartitionId(), capturedParameters.get(1).getPartitionIdOrAllPartitions());
}
@Test
@ -169,11 +180,11 @@ public class FetchResourceIdsStepTest {
IJobDataSink<ResourceIdList> sink = mock(IJobDataSink.class);
JobInstance instance = new JobInstance();
instance.setInstanceId("1");
BulkExportJobParameters parameters = createParameters();
BulkExportJobParameters parameters = createParameters(false);
parameters.setResourceTypes(Collections.singletonList("Patient"));
StepExecutionDetails<BulkExportJobParameters, VoidModel> input = createInput(parameters, instance);
ourLog.setLevel(Level.INFO);
List<IResourcePersistentId> patientIds = new ArrayList<>();
List<JpaPid> patientIds = new ArrayList<>();
// when
int maxFileCapacity = 5;
@ -202,7 +213,7 @@ public class FetchResourceIdsStepTest {
// verify all submitted ids are there
boolean found = false;
for (IResourcePersistentId pid : patientIds) {
for (JpaPid pid : patientIds) {
BatchResourceId batchResourceId = BatchResourceId.getIdFromPID(pid, "Patient");
for (ResourceIdList idList : listIds) {
found = idList.getIds().contains(batchResourceId);

View File

@ -10,10 +10,12 @@ import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
@ -25,6 +27,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@ -97,21 +101,32 @@ public class WriteBinaryStepTest {
}
private StepExecutionDetails<BulkExportJobParameters, ExpandedResourcesList> createInput(ExpandedResourcesList theData,
JobInstance theInstance) {
JobInstance theInstance,
boolean thePartitioned) {
BulkExportJobParameters parameters = new BulkExportJobParameters();
parameters.setStartDate(new Date());
parameters.setResourceTypes(Arrays.asList("Patient", "Observation"));
StepExecutionDetails<BulkExportJobParameters, ExpandedResourcesList> input = new StepExecutionDetails<>(
parameters.setPartitionId(getPartitionId(thePartitioned));
return new StepExecutionDetails<>(
parameters,
theData,
theInstance,
"1"
);
return input;
}
@Test
public void run_validInputNoErrors_succeeds() {
private RequestPartitionId getPartitionId(boolean thePartitioned) {
if (thePartitioned) {
return RequestPartitionId.fromPartitionName("Partition-A");
} else {
return RequestPartitionId.defaultPartition();
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void run_validInputNoErrors_succeeds(boolean thePartitioned) {
// setup
ExpandedResourcesList expandedResources = new ExpandedResourcesList();
JobInstance instance = new JobInstance();
@ -121,7 +136,7 @@ public class WriteBinaryStepTest {
expandedResources.setResourceType("Patient");
IFhirResourceDao<IBaseBinary> binaryDao = mock(IFhirResourceDao.class);
IJobDataSink<BulkExportBinaryFileId> sink = mock(IJobDataSink.class);
StepExecutionDetails<BulkExportJobParameters, ExpandedResourcesList> input = createInput(expandedResources, instance);
StepExecutionDetails<BulkExportJobParameters, ExpandedResourcesList> input = createInput(expandedResources, instance, thePartitioned);
IIdType binaryId = new IdType("Binary/123");
DaoMethodOutcome methodOutcome = new DaoMethodOutcome();
methodOutcome.setId(binaryId);
@ -139,8 +154,9 @@ public class WriteBinaryStepTest {
assertEquals(new RunOutcome(stringified.size()).getRecordsProcessed(), outcome.getRecordsProcessed());
ArgumentCaptor<IBaseBinary> binaryCaptor = ArgumentCaptor.forClass(IBaseBinary.class);
ArgumentCaptor<SystemRequestDetails> binaryDaoCreateRequestDetailsCaptor = ArgumentCaptor.forClass(SystemRequestDetails.class);
verify(binaryDao)
.create(binaryCaptor.capture(), any(RequestDetails.class));
.create(binaryCaptor.capture(), binaryDaoCreateRequestDetailsCaptor.capture());
String outputString = new String(binaryCaptor.getValue().getContent());
// post-pending a \n (as this is what the binary does)
String expected = String.join("\n", stringified) + "\n";
@ -149,6 +165,9 @@ public class WriteBinaryStepTest {
outputString,
outputString + " != " + expected
);
if (thePartitioned) {
assertEquals(getPartitionId(thePartitioned), binaryDaoCreateRequestDetailsCaptor.getValue().getRequestPartitionId());
}
ArgumentCaptor<BulkExportBinaryFileId> fileIdArgumentCaptor = ArgumentCaptor.forClass(BulkExportBinaryFileId.class);
verify(sink)
@ -168,7 +187,7 @@ public class WriteBinaryStepTest {
expandedResources.setResourceType("Patient");
IFhirResourceDao<IBaseBinary> binaryDao = mock(IFhirResourceDao.class);
IJobDataSink<BulkExportBinaryFileId> sink = mock(IJobDataSink.class);
StepExecutionDetails<BulkExportJobParameters, ExpandedResourcesList> input = createInput(expandedResources, instance);
StepExecutionDetails<BulkExportJobParameters, ExpandedResourcesList> input = createInput(expandedResources, instance, false);
ourLog.setLevel(Level.ERROR);

View File

@ -1,10 +1,16 @@
package ca.uhn.fhir.batch2.jobs.services;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.model.Batch2JobInfo;
import ca.uhn.fhir.jpa.api.model.BulkExportParameters;
import ca.uhn.fhir.jpa.batch.models.Batch2BaseJobParameters;
import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
@ -21,11 +27,17 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Date;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class Batch2JobRunnerImplTest {
@ -95,6 +107,13 @@ public class Batch2JobRunnerImplTest {
BulkExportParameters parameters = new BulkExportParameters(Batch2JobDefinitionConstants.BULK_EXPORT);
parameters.setResourceTypes(Collections.singletonList("Patient"));
// when
String jobInstanceId = "test_job_instance";
Date end = new Date();
Date start = new Date(end.getTime()-100);
JobInstance mockJobInstance = createMockJobInstance(parameters, jobInstanceId, start, end);
when(myJobCoordinator.getInstance(eq(jobInstanceId))).thenReturn(mockJobInstance);
// test
myJobRunner.startNewJob(parameters);
@ -106,5 +125,72 @@ public class Batch2JobRunnerImplTest {
// we need to verify something in the parameters
ourLog.info(val.getParameters());
assertTrue(val.getParameters().contains("Patient"));
assertFalse(val.getParameters().contains("allPartitions"));
assertFalse(val.getParameters().contains("Partition-A"));
Batch2JobInfo jobInfo = myJobRunner.getJobInfo(jobInstanceId);
verifyBatch2JobInfo(jobInfo, jobInstanceId, start, end, null);
}
private JobInstance createMockJobInstance(BulkExportParameters theParameters, String theJobInstanceId, Date start, Date end) {
JobInstance mockJobInstance = new JobInstance();
mockJobInstance.setInstanceId(theJobInstanceId);
mockJobInstance.setStatus(StatusEnum.COMPLETED);
mockJobInstance.setCancelled(false);
mockJobInstance.setStartTime(start);
mockJobInstance.setEndTime(end);
mockJobInstance.setReport("test report");
mockJobInstance.setJobDefinitionId(Batch2JobDefinitionConstants.BULK_EXPORT);
mockJobInstance.setParameters(BulkExportJobParameters.createFromExportJobParameters(theParameters));
return mockJobInstance;
}
private void verifyBatch2JobInfo(Batch2JobInfo jobInfo, String theJobId, Date start, Date end, RequestPartitionId partitionId) {
assertEquals(jobInfo.getJobId(), theJobId );
assertFalse(jobInfo.isCancelled());
assertEquals(jobInfo.getStartTime(), start);
assertEquals(jobInfo.getEndTime(), end);
assertEquals(jobInfo.getReport(), "test report");
assertEquals(jobInfo.getStatus(), BulkExportJobStatusEnum.COMPLETE);
if (partitionId != null) {
assertEquals(jobInfo.getRequestPartitionId(), partitionId);
} else {
assertNull(jobInfo.getRequestPartitionId());
}
}
@Test
public void startJob_bulkExport_partitioned() {
// setup
BulkExportParameters parameters = new BulkExportParameters(Batch2JobDefinitionConstants.BULK_EXPORT);
parameters.setResourceTypes(Collections.singletonList("Patient"));
RequestPartitionId partitionId = RequestPartitionId.fromPartitionName("Partition-A");
parameters.setPartitionId(partitionId);
// when
String jobInstanceId = "test_job_instance";
Date end = new Date();
Date start = new Date(end.getTime()-100);
JobInstance mockJobInstance = createMockJobInstance(parameters, jobInstanceId, start, end);
when(myJobCoordinator.getInstance(eq(jobInstanceId))).thenReturn(mockJobInstance);
// test
myJobRunner.startNewJob(parameters);
// verify
ArgumentCaptor<JobInstanceStartRequest> captor = ArgumentCaptor.forClass(JobInstanceStartRequest.class);
verify(myJobCoordinator)
.startInstance(captor.capture());
JobInstanceStartRequest val = captor.getValue();
// we need to verify something in the parameters
ourLog.info(val.getParameters());
assertTrue(val.getParameters().contains("Patient"));
assertTrue(val.getParameters().contains("Partition-A"));
assertTrue(val.getParameters().contains("\"allPartitions\":false"));
Batch2JobInfo jobInfo = myJobRunner.getJobInfo(jobInstanceId);
verifyBatch2JobInfo(jobInfo, jobInstanceId, start, end, partitionId);
}
}

View File

@ -19,6 +19,7 @@
*/
package ca.uhn.fhir.jpa.api.model;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import java.util.Date;
@ -46,6 +47,8 @@ public class Batch2JobInfo {
// the output report (stringified json of whatever the reduction step outputs)
private String myReport;
private RequestPartitionId myRequestPartitionId;
public String getJobId() {
return myJobId;
}
@ -101,4 +104,12 @@ public class Batch2JobInfo {
public void setErrorMsg(String theErrorMsg) {
myErrorMsg = theErrorMsg;
}
public RequestPartitionId getRequestPartitionId() {
return myRequestPartitionId;
}
public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) {
myRequestPartitionId = theRequestPartitionId;
}
}

View File

@ -19,6 +19,7 @@
*/
package ca.uhn.fhir.jpa.api.model;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.batch.models.Batch2BaseJobParameters;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
@ -80,6 +81,11 @@ public class BulkExportParameters extends Batch2BaseJobParameters {
*/
private String myOriginalRequestUrl;
/**
* The partition for the request if applicable.
*/
private RequestPartitionId myPartitionId;
public boolean isExpandMdm() {
return myExpandMdm;
}
@ -158,4 +164,12 @@ public class BulkExportParameters extends Batch2BaseJobParameters {
public void setOriginalRequestUrl(String theOriginalRequestUrl) {
myOriginalRequestUrl = theOriginalRequestUrl;
}
public RequestPartitionId getPartitionId() {
return myPartitionId;
}
public void setPartitionId(RequestPartitionId thePartitionId) {
myPartitionId = thePartitionId;
}
}

View File

@ -19,6 +19,7 @@
*/
package ca.uhn.fhir.jpa.bulk.export.model;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
@ -72,6 +73,11 @@ public class ExportPIDIteratorParameters {
*/
private List<String> myPatientIds;
/**
* The partition id
*/
private RequestPartitionId myPartitionId;
public String getResourceType() {
return myResourceType;
}
@ -136,6 +142,18 @@ public class ExportPIDIteratorParameters {
myPatientIds = thePatientIds;
}
public RequestPartitionId getPartitionIdOrAllPartitions() {
if (myPartitionId != null) {
return myPartitionId;
} else {
return RequestPartitionId.allPartitions();
}
}
public void setPartitionId(RequestPartitionId thePartitionId) {
myPartitionId = thePartitionId;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.model.Batch2JobInfo;
@ -35,6 +36,7 @@ import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.util.BulkExportUtils;
import ca.uhn.fhir.model.primitive.StringDt;
import ca.uhn.fhir.rest.annotation.IdParam;
@ -110,6 +112,9 @@ public class BulkDataExportProvider {
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private IRequestPartitionHelperSvc myRequestPartitionHelperService;
/**
* $export
*/
@ -153,6 +158,11 @@ public class BulkDataExportProvider {
parameters.setResourceTypes(resourceTypes);
}
// Determine and validate partition permissions (if needed).
RequestPartitionId partitionId = myRequestPartitionHelperService.determineReadPartitionForRequest(theRequestDetails, null);
myRequestPartitionHelperService.validateHasPartitionPermissions(theRequestDetails, "Binary", partitionId);
parameters.setPartitionId(partitionId);
// start job
Batch2JobStartResponse response = myJobRunner.startNewJob(parameters);
@ -173,25 +183,7 @@ public class BulkDataExportProvider {
}
private String getServerBase(ServletRequestDetails theRequestDetails) {
if (theRequestDetails.getCompleteUrl().contains(theRequestDetails.getServerBaseForRequest())) {
// Base URL not Fixed
return StringUtils.removeEnd(theRequestDetails.getServerBaseForRequest(), "/");
} else {
// Base URL Fixed
int index = StringUtils.indexOf(theRequestDetails.getCompleteUrl(), theRequestDetails.getOperation());
if (index == -1) {
return null;
}
return theRequestDetails.getCompleteUrl().substring(0, index - 1);
}
}
private String getDefaultPartitionServerBase(ServletRequestDetails theRequestDetails) {
if (theRequestDetails.getTenantId() == null || theRequestDetails.getTenantId().equals(JpaConstants.DEFAULT_PARTITION_NAME)) {
return getServerBase(theRequestDetails);
} else {
return StringUtils.removeEnd(theRequestDetails.getServerBaseForRequest().replace(theRequestDetails.getTenantId(), JpaConstants.DEFAULT_PARTITION_NAME), "/");
}
}
/**
@ -313,6 +305,15 @@ public class BulkDataExportProvider {
throw new ResourceNotFoundException(Msg.code(2040) + "Unknown instance ID: " + theJobId + ". Please check if the input job ID is valid.");
}
if(info.getRequestPartitionId() != null) {
// Determine and validate permissions for partition (if needed)
RequestPartitionId partitionId = myRequestPartitionHelperService.determineReadPartitionForRequest(theRequestDetails, null);
myRequestPartitionHelperService.validateHasPartitionPermissions(theRequestDetails, "Binary", partitionId);
if(!info.getRequestPartitionId().equals(partitionId)){
throw new InvalidRequestException(Msg.code(2304) + "Invalid partition in request for Job ID " + theJobId);
}
}
switch (info.getStatus()) {
case COMPLETE:
if (theRequestDetails.getRequestType() == RequestTypeEnum.DELETE) {
@ -338,7 +339,7 @@ public class BulkDataExportProvider {
bulkResponseDocument.setMsg(results.getReportMsg());
bulkResponseDocument.setRequest(results.getOriginalRequestUrl());
String serverBase = getDefaultPartitionServerBase(theRequestDetails);
String serverBase = getServerBase(theRequestDetails);
for (Map.Entry<String, List<String>> entrySet : results.getResourceTypeToBinaryIds().entrySet()) {
String resourceType = entrySet.getKey();