4182 convert $mdm submit to a batch job (#4188)

* Changelog

* Wip implementation

* Wip

* wip

* wip

* Fix validator

* Fix up implementation

* tidfy

* TIdy up tests

* Add some docs

* Add changelog info:

* Tidy up self review

* License files

* Update hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_mdm/mdm_operations.md

Co-authored-by: Ken Stevens <khstevens@gmail.com>

* Review changes

* fixes

* add new method

* Update tests

* Add batch size parameter

Co-authored-by: Ken Stevens <khstevens@gmail.com>
This commit is contained in:
Tadgh 2022-10-25 12:02:04 -07:00 committed by GitHub
parent fb3512df1e
commit 78ce2a6344
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 611 additions and 179 deletions

View File

@ -0,0 +1,4 @@
---
type: add
issue: 4182
title: "`$mdm-submit` can now be run as a batch job, which will return a job ID, and can be polled for status. This can be accomplished by sending a `Prefer: respond-async` header with the request."

View File

@ -718,6 +718,10 @@ After the operation is complete, all resources that matched the criteria will no
This operation takes a single optional criteria parameter unless it is called on a specific instance.
Note that this operation can take a long time on large data sets. In order to support large data sets, the operation can be run asynchronously. This can be done by
sending the `Prefer: respond-async` header with the request. This will cause HAPI-FHIR to execute the request as a batch job. The response will contain a `jobId` parameter that can be used to poll the status of the operation. Note that completion of the job indicates completion of loading all the resources onto the broker,
not necessarily the completion of the actual underlying MDM process.
<table class="table table-striped table-condensed">
<thead>
<tr>
@ -773,9 +777,13 @@ This operation returns the number of resources that were submitted for MDM proce
}
```
This operation can also be done at the Instance level. When this is the case, the operations accepts no parameters. The following are examples of Instance level POSTs, which require no parameters.
```url
http://example.com/Patient/123/$mdm-submit
http://example.com/Practitioner/456/$mdm-submit
```

View File

@ -1,81 +0,0 @@
package ca.uhn.fhir.jpa.search.helper;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class SearchParamHelper {
@Autowired
private FhirContext myFhirContext;
public Collection<RuntimeSearchParam> getPatientSearchParamsForResourceType(String theResourceType) {
RuntimeResourceDefinition runtimeResourceDefinition = myFhirContext.getResourceDefinition(theResourceType);
Map<String, RuntimeSearchParam> searchParams = new HashMap<>();
RuntimeSearchParam patientSearchParam = runtimeResourceDefinition.getSearchParam("patient");
if (patientSearchParam != null) {
searchParams.put(patientSearchParam.getName(), patientSearchParam);
}
RuntimeSearchParam subjectSearchParam = runtimeResourceDefinition.getSearchParam("subject");
if (subjectSearchParam != null) {
searchParams.put(subjectSearchParam.getName(), subjectSearchParam);
}
List<RuntimeSearchParam> compartmentSearchParams = getPatientCompartmentRuntimeSearchParams(runtimeResourceDefinition);
compartmentSearchParams.forEach(param -> searchParams.put(param.getName(), param));
return searchParams.values();
}
/**
* Search the resource definition for a compartment named 'patient' and return its related Search Parameter.
*/
public List<RuntimeSearchParam> getPatientCompartmentRuntimeSearchParams(RuntimeResourceDefinition runtimeResourceDefinition) {
List<RuntimeSearchParam> patientSearchParam = new ArrayList<>();
List<RuntimeSearchParam> searchParams = runtimeResourceDefinition.getSearchParamsForCompartmentName("Patient");
return searchParams;
// if (searchParams == null || searchParams.size() == 0) {
// String errorMessage = String.format("Resource type [%s] is not eligible for this type of export, as it contains no Patient compartment, and no `patient` or `subject` search parameter", myResourceType);
// throw new IllegalArgumentException(Msg.code(1264) + errorMessage);
// } else if (searchParams.size() == 1) {
// patientSearchParam = searchParams.get(0);
// } else {
// String errorMessage = String.format("Resource type [%s] is not eligible for Group Bulk export, as we are unable to disambiguate which patient search parameter we should be searching by.", myResourceType);
// throw new IllegalArgumentException(Msg.code(1265) + errorMessage);
// }
// return patientSearchParam;
}
}

View File

@ -38,6 +38,8 @@ import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.mdm.api.paging.MdmPageRequest;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearAppCtx;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearJobParameters;
import ca.uhn.fhir.mdm.batch2.submit.MdmSubmitAppCtx;
import ca.uhn.fhir.mdm.batch2.submit.MdmSubmitJobParameters;
import ca.uhn.fhir.mdm.model.MdmTransactionContext;
import ca.uhn.fhir.mdm.provider.MdmControllerHelper;
import ca.uhn.fhir.mdm.provider.MdmControllerUtil;
@ -196,6 +198,31 @@ public class MdmControllerSvcImpl implements IMdmControllerSvc {
return retVal;
}
@Override
public IBaseParameters submitMdmSubmitJob(List<String> theUrls, IPrimitiveType<BigDecimal> theBatchSize, ServletRequestDetails theRequestDetails) {
MdmSubmitJobParameters params = new MdmSubmitJobParameters();
if (theBatchSize != null && theBatchSize.getValue() !=null && theBatchSize.getValue().longValue() > 0) {
params.setBatchSize(theBatchSize.getValue().intValue());
}
ReadPartitionIdRequestDetails details= new ReadPartitionIdRequestDetails(null, RestOperationTypeEnum.EXTENDED_OPERATION_SERVER, null, null, null);
params.setRequestPartitionId(RequestPartitionId.allPartitions());
theUrls.forEach(params::addUrl);
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setParameters(params);
request.setJobDefinitionId(MdmSubmitAppCtx.MDM_SUBMIT_JOB);
Batch2JobStartResponse batch2JobStartResponse = myJobCoordinator.startInstance(request);
String id = batch2JobStartResponse.getJobId();
IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext);
ParametersUtil.addParameterToParametersString(myFhirContext, retVal, ProviderConstants.OPERATION_BATCH_RESPONSE_JOB_ID, id);
return retVal;
}
@Override
public void notDuplicateGoldenResource(String theGoldenResourceId, String theTargetGoldenResourceId, MdmTransactionContext theMdmTransactionContext) {
IAnyResource goldenResource = myMdmControllerHelper.getLatestGoldenResourceFromIdOrThrowException(ProviderConstants.MDM_UPDATE_LINK_GOLDEN_RESOURCE_ID, theGoldenResourceId);

View File

@ -3,9 +3,11 @@ package ca.uhn.fhir.jpa.mdm.provider;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.mdm.rules.config.MdmSettings;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.test.concurrency.PointcutLatch;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.r4.model.IdType;
@ -15,16 +17,28 @@ import org.hl7.fhir.r4.model.Practitioner;
import org.hl7.fhir.r4.model.StringType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.Servlet;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MdmProviderBatchR4Test extends BaseLinkR4Test {
public static final String ORGANIZATION_DUMMY = "Organization/dummy";
@ -44,6 +58,16 @@ public class MdmProviderBatchR4Test extends BaseLinkR4Test {
PointcutLatch afterMdmLatch = new PointcutLatch(Pointcut.MDM_AFTER_PERSISTED_RESOURCE_CHECKED);
public static Stream<Arguments> requestTypes() {
ServletRequestDetails asyncSrd = mock(ServletRequestDetails.class);
when(asyncSrd.getHeader("Prefer")).thenReturn("respond-async");
ServletRequestDetails syncSrd = mock(ServletRequestDetails.class);
return Stream.of(
Arguments.of(Named.of("Asynchronous Request", asyncSrd)),
Arguments.of(Named.of("Synchronous Request", syncSrd))
);
}
@Override
@BeforeEach
public void before() throws Exception {
@ -74,21 +98,23 @@ public class MdmProviderBatchR4Test extends BaseLinkR4Test {
super.after();
}
@Test
public void testBatchRunOnAllMedications() throws InterruptedException {
@ParameterizedTest
@MethodSource("requestTypes")
public void testBatchRunOnAllMedications(ServletRequestDetails theSyncOrAsyncRequest) throws InterruptedException {
StringType criteria = null;
clearMdmLinks();
afterMdmLatch.runWithExpectedCount(1, () -> myMdmProvider.mdmBatchOnAllSourceResources(new StringType("Medication"), criteria, null));
afterMdmLatch.runWithExpectedCount(1, () -> myMdmProvider.mdmBatchOnAllSourceResources(new StringType("Medication"), criteria, null, theSyncOrAsyncRequest));
assertLinkCount(1);
}
@Test
public void testBatchRunOnAllPractitioners() throws InterruptedException {
@ParameterizedTest
@MethodSource("requestTypes")
public void testBatchRunOnAllPractitioners(ServletRequestDetails theSyncOrAsyncRequest) throws InterruptedException {
StringType criteria = null;
clearMdmLinks();
afterMdmLatch.runWithExpectedCount(1, () -> myMdmProvider.mdmBatchPractitionerType(criteria, null));
afterMdmLatch.runWithExpectedCount(1, () -> myMdmProvider.mdmBatchPractitionerType(criteria, null, theSyncOrAsyncRequest));
assertLinkCount(1);
}
@Test
@ -108,12 +134,13 @@ public class MdmProviderBatchR4Test extends BaseLinkR4Test {
}
}
@Test
public void testBatchRunOnAllPatients() throws InterruptedException {
@ParameterizedTest
@MethodSource("requestTypes")
public void testBatchRunOnAllPatients(ServletRequestDetails theSyncOrAsyncRequest) throws InterruptedException {
assertLinkCount(3);
StringType criteria = null;
clearMdmLinks();
afterMdmLatch.runWithExpectedCount(1, () -> myMdmProvider.mdmBatchPatientType(criteria, null));
afterMdmLatch.runWithExpectedCount(1, () -> myMdmProvider.mdmBatchPatientType(criteria, null, theSyncOrAsyncRequest));
assertLinkCount(1);
}
@ -136,29 +163,33 @@ public class MdmProviderBatchR4Test extends BaseLinkR4Test {
}
}
@Tag("intermittent")
// @Test
public void testBatchRunOnAllTypes() throws InterruptedException {
@ParameterizedTest
@MethodSource("requestTypes")
public void testBatchRunOnAllTypes(ServletRequestDetails theSyncOrAsyncRequest) throws InterruptedException {
assertLinkCount(3);
StringType criteria = new StringType("");
clearMdmLinks();
afterMdmLatch.runWithExpectedCount(3, () -> {
myMdmProvider.mdmBatchOnAllSourceResources(null, criteria, null);
myMdmProvider.mdmBatchOnAllSourceResources(null, criteria, null, theSyncOrAsyncRequest);
});
assertLinkCount(3);
}
@Test
public void testBatchRunOnAllTypesWithInvalidCriteria() {
@ParameterizedTest
@MethodSource("requestTypes")
public void testBatchRunOnAllTypesWithInvalidCriteria(ServletRequestDetails theSyncOrAsyncRequest) {
assertLinkCount(3);
StringType criteria = new StringType("death-date=2020-06-01");
clearMdmLinks();
try {
myMdmProvider.mdmBatchPractitionerType(criteria, null);
myMdmProvider.mdmBatchOnAllSourceResources(null, criteria , null, theSyncOrAsyncRequest);
fail();
} catch (InvalidRequestException e) {
assertThat(e.getMessage(), is(equalTo(Msg.code(488) + "Failed to parse match URL[death-date=2020-06-01] - Resource type Practitioner does not have a parameter with name: death-date")));
assertThat(e.getMessage(), either(
containsString(Msg.code(2039) + "Failed to validate parameters for job"))//Async case
.or(containsString(Msg.code(488) + "Failed to parse match URL")));// Sync case
}
}
}

View File

@ -4,6 +4,7 @@
<level>TRACE</level>
</filter>
<encoder>
<!--N.B use this pattern to remove timestamp/thread/level/logger information from logs during testing.<pattern>[%file:%line] %msg%n</pattern>-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} [%file:%line] %msg%n</pattern>
</encoder>
</appender>
@ -71,6 +72,11 @@
<appender-ref ref="MDM_TROUBLESHOOTING"/>
</logger>
<!-- <logger name="ca.uhn.fhir.log.batch_troubleshooting" level="DEBUG" additivity="false">-->
<!-- <appender-ref ref="STDOUT"/>-->
<!-- </logger>-->
<root level="info">
<appender-ref ref="STDOUT" />
</root>

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.searchparam.util;
/*-
* #%L
* HAPI FHIR Search Parameters
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.registry.SearchParameterCanonicalizer;

View File

@ -116,7 +116,7 @@
<scope>test</scope>
</dependency>
</dependencies>
</dependencies>
<build>
<pluginManagement>

View File

@ -55,4 +55,5 @@ public interface IMdmControllerSvc {
IAnyResource createLink(String theGoldenResourceId, String theSourceResourceId, @Nullable String theMatchResult, MdmTransactionContext theMdmTransactionContext);
IBaseParameters submitMdmClearJob(List<String> theResourceNames, IPrimitiveType<BigDecimal> theBatchSize, ServletRequestDetails theRequestDetails);
IBaseParameters submitMdmSubmitJob(List<String> theUrls, IPrimitiveType<BigDecimal> theBatchSize, ServletRequestDetails theRequestDetails);
}

View File

@ -49,8 +49,10 @@ import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.slf4j.Logger;
import org.springframework.data.domain.Page;
import javax.annotation.Nonnull;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@ -78,7 +80,8 @@ public class MdmProviderDstu3Plus extends BaseMdmProvider {
IMdmControllerSvc theMdmControllerSvc,
MdmControllerHelper theMdmHelper,
IMdmSubmitSvc theMdmSubmitSvc,
IMdmSettings theIMdmSettings) {
IMdmSettings theIMdmSettings
) {
super(theFhirContext);
myMdmControllerSvc = theMdmControllerSvc;
myMdmControllerHelper = theMdmHelper;
@ -148,7 +151,7 @@ public class MdmProviderDstu3Plus extends BaseMdmProvider {
@OperationParam(name = ProviderConstants.OPERATION_BATCH_RESPONSE_JOB_ID, typeName = "decimal")
})
public IBaseParameters clearMdmLinks(@OperationParam(name = ProviderConstants.OPERATION_MDM_CLEAR_RESOURCE_NAME, min = 0, max = OperationParam.MAX_UNLIMITED, typeName = "string") List<IPrimitiveType<String>> theResourceNames,
@OperationParam(name = ProviderConstants.OPERATION_MDM_CLEAR_BATCH_SIZE, typeName = "decimal", min = 0, max = 1) IPrimitiveType<BigDecimal> theBatchSize,
@OperationParam(name = ProviderConstants.OPERATION_MDM_BATCH_SIZE, typeName = "decimal", min = 0, max = 1) IPrimitiveType<BigDecimal> theBatchSize,
ServletRequestDetails theRequestDetails) {
List<String> resourceNames = new ArrayList<>();
@ -234,20 +237,43 @@ public class MdmProviderDstu3Plus extends BaseMdmProvider {
public IBaseParameters mdmBatchOnAllSourceResources(
@OperationParam(name = ProviderConstants.MDM_BATCH_RUN_RESOURCE_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theResourceType,
@OperationParam(name = ProviderConstants.MDM_BATCH_RUN_CRITERIA, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theCriteria,
@OperationParam(name = ProviderConstants.OPERATION_MDM_BATCH_SIZE, typeName = "decimal", min = 0, max = 1) IPrimitiveType<BigDecimal> theBatchSize,
ServletRequestDetails theRequestDetails) {
String criteria = convertStringTypeToString(theCriteria);
String resourceType = convertStringTypeToString(theResourceType);
long submittedCount;
if (resourceType != null) {
submittedCount = myMdmSubmitSvc.submitSourceResourceTypeToMdm(resourceType, criteria, theRequestDetails);
if (theRequestDetails.isPreferRespondAsync()) {
List<String> urls = buildUrlsForJob(criteria, resourceType);
return myMdmControllerSvc.submitMdmSubmitJob(urls, theBatchSize, theRequestDetails);
} else {
submittedCount = myMdmSubmitSvc.submitAllSourceTypesToMdm(criteria, theRequestDetails);
if (StringUtils.isNotBlank(resourceType)) {
submittedCount = myMdmSubmitSvc.submitSourceResourceTypeToMdm(resourceType, criteria, theRequestDetails);
} else {
submittedCount = myMdmSubmitSvc.submitAllSourceTypesToMdm(criteria, theRequestDetails);
}
return buildMdmOutParametersWithCount(submittedCount);
}
return buildMdmOutParametersWithCount(submittedCount);
}
@Nonnull
private List<String> buildUrlsForJob(String criteria, String resourceType) {
List<String> urls = new ArrayList<>();
if (StringUtils.isNotBlank(resourceType)) {
String theUrl = resourceType + "?" + criteria;
urls.add(theUrl);
} else {
myMdmSettings.getMdmRules().getMdmTypes()
.stream()
.map(type -> type + "?" + criteria)
.forEach(urls::add);
}
return urls;
}
private String convertStringTypeToString(IPrimitiveType<String> theCriteria) {
return theCriteria == null ? null : theCriteria.getValueAsString();
return theCriteria == null ? "" : theCriteria.getValueAsString();
}
@ -266,10 +292,16 @@ public class MdmProviderDstu3Plus extends BaseMdmProvider {
})
public IBaseParameters mdmBatchPatientType(
@OperationParam(name = ProviderConstants.MDM_BATCH_RUN_CRITERIA, typeName = "string") IPrimitiveType<String> theCriteria,
RequestDetails theRequest) {
String criteria = convertStringTypeToString(theCriteria);
long submittedCount = myMdmSubmitSvc.submitPatientTypeToMdm(criteria, theRequest);
return buildMdmOutParametersWithCount(submittedCount);
@OperationParam(name = ProviderConstants.OPERATION_MDM_BATCH_SIZE, typeName = "decimal", min = 0, max = 1) IPrimitiveType<BigDecimal> theBatchSize,
ServletRequestDetails theRequest) {
if (theRequest.isPreferRespondAsync()) {
String theUrl = "Patient?";
return myMdmControllerSvc.submitMdmSubmitJob(Collections.singletonList(theUrl), theBatchSize, theRequest);
} else {
String criteria = convertStringTypeToString(theCriteria);
long submittedCount = myMdmSubmitSvc.submitPatientTypeToMdm(criteria, theRequest);
return buildMdmOutParametersWithCount(submittedCount);
}
}
@Operation(name = ProviderConstants.OPERATION_MDM_SUBMIT, idempotent = false, typeName = "Practitioner", returnParameters = {
@ -287,10 +319,16 @@ public class MdmProviderDstu3Plus extends BaseMdmProvider {
})
public IBaseParameters mdmBatchPractitionerType(
@OperationParam(name = ProviderConstants.MDM_BATCH_RUN_CRITERIA, typeName = "string") IPrimitiveType<String> theCriteria,
RequestDetails theRequest) {
String criteria = convertStringTypeToString(theCriteria);
long submittedCount = myMdmSubmitSvc.submitPractitionerTypeToMdm(criteria, theRequest);
return buildMdmOutParametersWithCount(submittedCount);
@OperationParam(name = ProviderConstants.OPERATION_MDM_BATCH_SIZE, typeName = "decimal", min = 0, max = 1) IPrimitiveType<BigDecimal> theBatchSize,
ServletRequestDetails theRequest) {
if (theRequest.isPreferRespondAsync()) {
String theUrl = "Practitioner?";
return myMdmControllerSvc.submitMdmSubmitJob(Collections.singletonList(theUrl), theBatchSize, theRequest);
} else {
String criteria = convertStringTypeToString(theCriteria);
long submittedCount = myMdmSubmitSvc.submitPractitionerTypeToMdm(criteria, theRequest);
return buildMdmOutParametersWithCount(submittedCount);
}
}
/**

View File

@ -57,7 +57,8 @@ public class MdmProviderLoader {
myMdmControllerSvc,
myMdmControllerHelper,
myMdmSubmitSvc,
myMdmSettings));
myMdmSettings
));
break;
default:
throw new ConfigurationException(Msg.code(1497) + "MDM not supported for FHIR version " + myFhirContext.getVersion().getVersion());

View File

@ -97,7 +97,7 @@ public class ProviderConstants {
public static final String OPERATION_MDM_CLEAR = "$mdm-clear";
public static final String OPERATION_MDM_CLEAR_RESOURCE_NAME = "resourceType";
public static final String OPERATION_MDM_CLEAR_BATCH_SIZE = "batchSize";
public static final String OPERATION_MDM_BATCH_SIZE = "batchSize";
public static final String OPERATION_MDM_SUBMIT = "$mdm-submit";
public static final String OPERATION_MDM_SUBMIT_OUT_PARAM_SUBMITTED = "submitted";
public static final String MDM_BATCH_RUN_CRITERIA = "criteria";

View File

@ -24,8 +24,10 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.PreferHeader;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.rest.server.RestfulServerUtils;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.Validate;
@ -206,4 +208,13 @@ public class ServletRequestDetails extends RequestDetails {
return Collections.unmodifiableMap(retVal);
}
/**
* Returns true if the `Prefer` header contains a value of `respond-async`
*/
public boolean isPreferRespondAsync() {
String preferHeader = getHeader(Constants.HEADER_PREFER);
PreferHeader prefer = RestfulServerUtils.parsePreferHeader(null, preferHeader);
return prefer.getRespondAsync();
}
}

View File

@ -38,7 +38,8 @@ import org.springframework.context.annotation.Import;
DeleteExpungeAppCtx.class,
BulkExportAppCtx.class,
TermCodeSystemJobConfig.class,
BulkImportPullConfig.class
BulkImportPullConfig.class,
})
public class Batch2JobsConfig {
}

View File

@ -22,8 +22,8 @@ package ca.uhn.fhir.batch2.jobs.export;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportBinaryFileId;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportExpandedResources;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportIdList;
import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
@ -51,14 +51,14 @@ public class BulkExportAppCtx {
.addFirstStep(
"fetch-resources",
"Fetches resource PIDs for exporting",
BulkExportIdList.class,
ResourceIdList.class,
fetchResourceIdsStep()
)
// expand out - fetch resources
.addIntermediateStep(
"expand-resources",
"Expand out resources",
BulkExportExpandedResources.class,
ExpandedResourcesList.class,
expandResourcesStep()
)
// write binaries and save to db

View File

@ -25,8 +25,8 @@ import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportExpandedResources;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportIdList;
import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.jobs.models.Id;
import ca.uhn.fhir.context.FhirContext;
@ -38,20 +38,17 @@ import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.interceptor.ResponseTerminologyTranslationSvc;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.slf4j.LoggerFactory.getLogger;
public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParameters, BulkExportIdList, BulkExportExpandedResources> {
public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParameters, ResourceIdList, ExpandedResourcesList> {
private static final Logger ourLog = getLogger(ExpandResourcesStep.class);
@Autowired
@ -68,9 +65,9 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<BulkExportJobParameters, BulkExportIdList> theStepExecutionDetails,
@Nonnull IJobDataSink<BulkExportExpandedResources> theDataSink) throws JobExecutionFailedException {
BulkExportIdList idList = theStepExecutionDetails.getData();
public RunOutcome run(@Nonnull StepExecutionDetails<BulkExportJobParameters, ResourceIdList> theStepExecutionDetails,
@Nonnull IJobDataSink<ExpandedResourcesList> theDataSink) throws JobExecutionFailedException {
ResourceIdList idList = theStepExecutionDetails.getData();
BulkExportJobParameters jobParameters = theStepExecutionDetails.getParameters();
ourLog.info("Step 2 for bulk export - Expand resources");
@ -95,7 +92,7 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
// set to datasink
for (String nextResourceType : resources.keySet()) {
BulkExportExpandedResources output = new BulkExportExpandedResources();
ExpandedResourcesList output = new ExpandedResourcesList();
output.setStringifiedResources(resources.get(nextResourceType));
output.setResourceType(nextResourceType);
theDataSink.accept(output);
@ -111,7 +108,7 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
return RunOutcome.SUCCESS;
}
private List<IBaseResource> fetchAllResources(BulkExportIdList theIds) {
private List<IBaseResource> fetchAllResources(ResourceIdList theIds) {
List<IBaseResource> resources = new ArrayList<>();
for (Id id : theIds.getIds()) {

View File

@ -26,7 +26,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportIdList;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.jobs.models.Id;
import ca.uhn.fhir.i18n.Msg;
@ -45,7 +45,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobParameters, BulkExportIdList> {
public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobParameters, ResourceIdList> {
private static final Logger ourLog = LoggerFactory.getLogger(FetchResourceIdsStep.class);
public static final int MAX_IDS_TO_BATCH = 900;
@ -59,7 +59,7 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<BulkExportJobParameters, VoidModel> theStepExecutionDetails,
@Nonnull IJobDataSink<BulkExportIdList> theDataSink) throws JobExecutionFailedException {
@Nonnull IJobDataSink<ResourceIdList> theDataSink) throws JobExecutionFailedException {
BulkExportJobParameters params = theStepExecutionDetails.getParameters();
ourLog.info("Starting BatchExport job");
@ -131,8 +131,8 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
private void submitWorkChunk(List<Id> theIds,
String theResourceType,
BulkExportJobParameters theParams,
IJobDataSink<BulkExportIdList> theDataSink) {
BulkExportIdList idList = new BulkExportIdList();
IJobDataSink<ResourceIdList> theDataSink) {
ResourceIdList idList = new ResourceIdList();
idList.setIds(theIds);

View File

@ -26,7 +26,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportBinaryFileId;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportExpandedResources;
import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
@ -49,7 +49,7 @@ import java.io.OutputStreamWriter;
import static org.slf4j.LoggerFactory.getLogger;
public class WriteBinaryStep implements IJobStepWorker<BulkExportJobParameters, BulkExportExpandedResources, BulkExportBinaryFileId> {
public class WriteBinaryStep implements IJobStepWorker<BulkExportJobParameters, ExpandedResourcesList, BulkExportBinaryFileId> {
private static final Logger ourLog = getLogger(WriteBinaryStep.class);
@Autowired
@ -60,10 +60,10 @@ public class WriteBinaryStep implements IJobStepWorker<BulkExportJobParameters,
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<BulkExportJobParameters, BulkExportExpandedResources> theStepExecutionDetails,
public RunOutcome run(@Nonnull StepExecutionDetails<BulkExportJobParameters, ExpandedResourcesList> theStepExecutionDetails,
@Nonnull IJobDataSink<BulkExportBinaryFileId> theDataSink) throws JobExecutionFailedException {
BulkExportExpandedResources expandedResources = theStepExecutionDetails.getData();
ExpandedResourcesList expandedResources = theStepExecutionDetails.getData();
final int numResourcesProcessed = expandedResources.getStringifiedResources().size();
ourLog.info("Write binary step of Job Export");

View File

@ -21,11 +21,10 @@ package ca.uhn.fhir.batch2.jobs.export.models;
*/
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Multimap;
import java.util.List;
public class BulkExportExpandedResources extends BulkExportJobBase {
public class ExpandedResourcesList extends BulkExportJobBase {
/**
* List of stringified resources ready for writing

View File

@ -25,7 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
public class BulkExportIdList extends BulkExportJobBase {
public class ResourceIdList extends BulkExportJobBase {
/**
* List of Id objects for serialization

View File

@ -4,8 +4,8 @@ package ca.uhn.fhir.batch2.jobs.export;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportExpandedResources;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportIdList;
import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.jobs.models.Id;
import ca.uhn.fhir.batch2.model.JobInstance;
@ -66,10 +66,10 @@ public class ExpandResourcesStepTest {
return parameters;
}
private StepExecutionDetails<BulkExportJobParameters, BulkExportIdList> createInput(BulkExportIdList theData,
BulkExportJobParameters theParameters,
JobInstance theInstance) {
StepExecutionDetails<BulkExportJobParameters, BulkExportIdList> input = new StepExecutionDetails<>(
private StepExecutionDetails<BulkExportJobParameters, ResourceIdList> createInput(ResourceIdList theData,
BulkExportJobParameters theParameters,
JobInstance theInstance) {
StepExecutionDetails<BulkExportJobParameters, ResourceIdList> input = new StepExecutionDetails<>(
theParameters,
theData,
theInstance,
@ -90,9 +90,9 @@ public class ExpandResourcesStepTest {
//setup
JobInstance instance = new JobInstance();
instance.setInstanceId("1");
IJobDataSink<BulkExportExpandedResources> sink = mock(IJobDataSink.class);
IJobDataSink<ExpandedResourcesList> sink = mock(IJobDataSink.class);
IFhirResourceDao<?> patientDao = mockOutDaoRegistry();
BulkExportIdList idList = new BulkExportIdList();
ResourceIdList idList = new ResourceIdList();
idList.setResourceType("Patient");
ArrayList<IBaseResource> resources = new ArrayList<>();
ArrayList<Id> ids = new ArrayList<>();
@ -109,7 +109,7 @@ public class ExpandResourcesStepTest {
}
idList.setIds(ids);
StepExecutionDetails<BulkExportJobParameters, BulkExportIdList> input = createInput(
StepExecutionDetails<BulkExportJobParameters, ResourceIdList> input = createInput(
idList,
createParameters(),
instance
@ -125,10 +125,10 @@ public class ExpandResourcesStepTest {
// data sink
ArgumentCaptor<BulkExportExpandedResources> expandedCaptor = ArgumentCaptor.forClass(BulkExportExpandedResources.class);
ArgumentCaptor<ExpandedResourcesList> expandedCaptor = ArgumentCaptor.forClass(ExpandedResourcesList.class);
verify(sink)
.accept(expandedCaptor.capture());
BulkExportExpandedResources expandedResources = expandedCaptor.getValue();
ExpandedResourcesList expandedResources = expandedCaptor.getValue();
assertEquals(resources.size(), expandedResources.getStringifiedResources().size());
// we'll only verify a single element
// but we want to make sure it's as compact as possible

View File

@ -4,7 +4,7 @@ import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportIdList;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.jobs.models.Id;
import ca.uhn.fhir.batch2.model.JobInstance;
@ -91,7 +91,7 @@ public class FetchResourceIdsStepTest {
@Test
public void run_withValidInputs_succeeds() {
// setup
IJobDataSink<BulkExportIdList> sink = mock(IJobDataSink.class);
IJobDataSink<ResourceIdList> sink = mock(IJobDataSink.class);
BulkExportJobParameters parameters = createParameters();
JobInstance instance = new JobInstance();
instance.setInstanceId("1");
@ -126,14 +126,14 @@ public class FetchResourceIdsStepTest {
// verify
assertEquals(RunOutcome.SUCCESS, outcome);
ArgumentCaptor<BulkExportIdList> resultCaptor = ArgumentCaptor.forClass(BulkExportIdList.class);
ArgumentCaptor<ResourceIdList> resultCaptor = ArgumentCaptor.forClass(ResourceIdList.class);
verify(sink, times(parameters.getResourceTypes().size()))
.accept(resultCaptor.capture());
List<BulkExportIdList> results = resultCaptor.getAllValues();
List<ResourceIdList> results = resultCaptor.getAllValues();
assertEquals(parameters.getResourceTypes().size(), results.size());
for (int i = 0; i < results.size(); i++) {
BulkExportIdList idList = results.get(i);
ResourceIdList idList = results.get(i);
String resourceType = idList.getResourceType();
assertTrue(parameters.getResourceTypes().contains(resourceType));
@ -165,7 +165,7 @@ public class FetchResourceIdsStepTest {
@Test
public void run_moreThanTheMaxFileCapacityPatients_hasAtLeastTwoJobs() {
// setup
IJobDataSink<BulkExportIdList> sink = mock(IJobDataSink.class);
IJobDataSink<ResourceIdList> sink = mock(IJobDataSink.class);
JobInstance instance = new JobInstance();
instance.setInstanceId("1");
BulkExportJobParameters parameters = createParameters();
@ -192,18 +192,18 @@ public class FetchResourceIdsStepTest {
RunOutcome outcome = myFirstStep.run(input, sink);
// verify
ArgumentCaptor<BulkExportIdList> captor = ArgumentCaptor.forClass(BulkExportIdList.class);
ArgumentCaptor<ResourceIdList> captor = ArgumentCaptor.forClass(ResourceIdList.class);
assertEquals(RunOutcome.SUCCESS, outcome);
verify(sink, times(2))
.accept(captor.capture());
List<BulkExportIdList> listIds = captor.getAllValues();
List<ResourceIdList> listIds = captor.getAllValues();
// verify all submitted ids are there
boolean found = false;
for (ResourcePersistentId pid : patientIds) {
Id id = Id.getIdFromPID(pid, "Patient");
for (BulkExportIdList idList : listIds) {
for (ResourceIdList idList : listIds) {
found = idList.getIds().contains(id);
if (found) {
break;

View File

@ -6,7 +6,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportBinaryFileId;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportExpandedResources;
import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.context.FhirContext;
@ -18,7 +18,6 @@ import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import com.google.common.collect.Multimap;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.IdType;
@ -97,12 +96,12 @@ public class WriteBinaryStepTest {
ourLog.detachAppender(myAppender);
}
private StepExecutionDetails<BulkExportJobParameters, BulkExportExpandedResources> createInput(BulkExportExpandedResources theData,
JobInstance theInstance) {
private StepExecutionDetails<BulkExportJobParameters, ExpandedResourcesList> createInput(ExpandedResourcesList theData,
JobInstance theInstance) {
BulkExportJobParameters parameters = new BulkExportJobParameters();
parameters.setStartDate(new Date());
parameters.setResourceTypes(Arrays.asList("Patient", "Observation"));
StepExecutionDetails<BulkExportJobParameters, BulkExportExpandedResources> input = new StepExecutionDetails<>(
StepExecutionDetails<BulkExportJobParameters, ExpandedResourcesList> input = new StepExecutionDetails<>(
parameters,
theData,
theInstance,
@ -114,7 +113,7 @@ public class WriteBinaryStepTest {
@Test
public void run_validInputNoErrors_succeeds() {
// setup
BulkExportExpandedResources expandedResources = new BulkExportExpandedResources();
ExpandedResourcesList expandedResources = new ExpandedResourcesList();
JobInstance instance = new JobInstance();
instance.setInstanceId("1");
List<String> stringified = Arrays.asList("first", "second", "third", "forth");
@ -122,7 +121,7 @@ public class WriteBinaryStepTest {
expandedResources.setResourceType("Patient");
IFhirResourceDao<IBaseBinary> binaryDao = mock(IFhirResourceDao.class);
IJobDataSink<BulkExportBinaryFileId> sink = mock(IJobDataSink.class);
StepExecutionDetails<BulkExportJobParameters, BulkExportExpandedResources> input = createInput(expandedResources, instance);
StepExecutionDetails<BulkExportJobParameters, ExpandedResourcesList> input = createInput(expandedResources, instance);
IIdType binaryId = new IdType("Binary/123");
DaoMethodOutcome methodOutcome = new DaoMethodOutcome();
methodOutcome.setId(binaryId);
@ -163,13 +162,13 @@ public class WriteBinaryStepTest {
String testException = "I am an exceptional exception.";
JobInstance instance = new JobInstance();
instance.setInstanceId("1");
BulkExportExpandedResources expandedResources = new BulkExportExpandedResources();
ExpandedResourcesList expandedResources = new ExpandedResourcesList();
List<String> stringified = Arrays.asList("first", "second", "third", "forth");
expandedResources.setStringifiedResources(stringified);
expandedResources.setResourceType("Patient");
IFhirResourceDao<IBaseBinary> binaryDao = mock(IFhirResourceDao.class);
IJobDataSink<BulkExportBinaryFileId> sink = mock(IJobDataSink.class);
StepExecutionDetails<BulkExportJobParameters, BulkExportExpandedResources> input = createInput(expandedResources, instance);
StepExecutionDetails<BulkExportJobParameters, ExpandedResourcesList> input = createInput(expandedResources, instance);
ourLog.setLevel(Level.ERROR);

View File

@ -64,7 +64,11 @@ public class ResourceIdListWorkChunkJson implements IModelJson {
return myTypedPids
.stream()
.map(t -> new ResourcePersistentId(t.getPid()))
.map(t -> {
ResourcePersistentId retval = new ResourcePersistentId(t.getPid());
retval.setResourceType(t.getResourceType());
return retval;
})
.collect(Collectors.toList());
}

View File

@ -29,17 +29,19 @@ import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import org.slf4j.Logger;
import javax.annotation.Nonnull;
import static org.slf4j.LoggerFactory.getLogger;
public class LoadIdsStep implements IJobStepWorker<PartitionedUrlListJobParameters, PartitionedUrlChunkRangeJson, ResourceIdListWorkChunkJson> {
private final IBatch2DaoSvc myBatch2DaoSvc;
private static final Logger ourLog = getLogger(LoadIdsStep.class);
private final ResourceIdListStep<PartitionedUrlListJobParameters, PartitionedUrlChunkRangeJson> myResourceIdListStep;
public LoadIdsStep(IBatch2DaoSvc theBatch2DaoSvc) {
myBatch2DaoSvc = theBatch2DaoSvc;
IIdChunkProducer<PartitionedUrlChunkRangeJson> idChunkProducer = new PartitionedUrlListIdChunkProducer(theBatch2DaoSvc);
myResourceIdListStep = new ResourceIdListStep<>(idChunkProducer);
}

View File

@ -129,8 +129,8 @@ public class ResourceIdListStep<PT extends PartitionedJobParameters, IT extends
return;
}
ourLog.info("Submitting work chunk with {} IDs", theTypedPids.size());
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(theTypedPids);
ourLog.debug("IDs are: {}", data);
theDataSink.accept(data);
}
}

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.mdm.batch2;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearAppCtx;
import ca.uhn.fhir.mdm.batch2.submit.MdmSubmitAppCtx;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
@ -33,7 +34,10 @@ import javax.annotation.PostConstruct;
import static ca.uhn.fhir.mdm.batch2.clear.MdmClearAppCtx.MDM_CLEAR_JOB_BEAN_NAME;
@Configuration
@Import({MdmClearAppCtx.class})
@Import({
MdmClearAppCtx.class,
MdmSubmitAppCtx.class
})
public class MdmBatch2Config {
@Autowired
JobDefinitionRegistry myJobDefinitionRegistry;

View File

@ -0,0 +1,92 @@
package ca.uhn.fhir.mdm.batch2.submit;
/*-
* #%L
* hapi-fhir-storage-mdm
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.batch2.api.*;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.mdm.api.IMdmChannelSubmitterSvc;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.interceptor.ResponseTerminologyTranslationSvc;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.List;
public class MdmInflateAndSubmitResourcesStep implements IJobStepWorker<MdmSubmitJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired(required = false)
private ResponseTerminologyTranslationSvc myResponseTerminologyTranslationSvc;
@Autowired
private IMdmChannelSubmitterSvc myMdmChannelSubmitterSvc;
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<MdmSubmitJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails,
@Nonnull IJobDataSink<VoidModel> theDataSink) throws JobExecutionFailedException {
ResourceIdListWorkChunkJson idList = theStepExecutionDetails.getData();
ourLog.info("Final Step for $mdm-submit - Expand and submit resources");
ourLog.info("About to expand {} resource IDs into their full resource bodies.", idList.getResourcePersistentIds().size());
//Inflate the resources by PID
List<IBaseResource> allResources = fetchAllResources(idList.getResourcePersistentIds());
//Replace the terminology
if (myResponseTerminologyTranslationSvc != null) {
myResponseTerminologyTranslationSvc.processResourcesForTerminologyTranslation(allResources);
}
//Submit
for (IBaseResource nextResource : allResources) {
myMdmChannelSubmitterSvc.submitResourceToMdmChannel(nextResource);
}
ourLog.info("Expanding of {} resources of type completed", idList.size());
return new RunOutcome(allResources.size());
}
private List<IBaseResource> fetchAllResources(List<ResourcePersistentId> theIds) {
List<IBaseResource> resources = new ArrayList<>();
for (ResourcePersistentId id : theIds) {
assert id.getResourceType() != null;
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(id.getResourceType());
// This should be a query, but we have PIDs, and we don't have a _pid search param. TODO GGG, figure out how to make this search by pid.
try {
resources.add(dao.readByPid(id));
} catch (ResourceNotFoundException e) {
ourLog.warn("While attempging to send [{}] to the MDM queue, the resource was not found.", id);
}
}
return resources;
}
}

View File

@ -0,0 +1,82 @@
package ca.uhn.fhir.mdm.batch2.submit;
/*-
* #%L
* hapi-fhir-storage-mdm
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.mdm.api.IMdmSettings;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MdmSubmitAppCtx {
private static final String MDM_SUBMIT_JOB_BEAN_NAME = "mdmSubmitJobDefinition";
public static String MDM_SUBMIT_JOB= "MDM_SUBMIT";
@Bean
public GenerateRangeChunksStep submitGenerateRangeChunksStep() {
return new GenerateRangeChunksStep();
}
@Bean(name = MDM_SUBMIT_JOB_BEAN_NAME)
public JobDefinition mdmSubmitJobDefinition(IBatch2DaoSvc theBatch2DaoSvc, MatchUrlService theMatchUrlService, FhirContext theFhirContext, IMdmSettings theMdmSettings) {
return JobDefinition.newBuilder()
.setJobDefinitionId(MDM_SUBMIT_JOB)
.setJobDescription("MDM Batch Submission")
.setJobDefinitionVersion(1)
.setParametersType(MdmSubmitJobParameters.class)
.setParametersValidator(mdmSubmitJobParametersValidator(theMatchUrlService, theFhirContext, theMdmSettings))
.addFirstStep(
"generate-ranges",
"generate data ranges to submit to mdm",
PartitionedUrlChunkRangeJson.class,
submitGenerateRangeChunksStep())
.addIntermediateStep(
"load-ids",
"Load the IDs",
ResourceIdListWorkChunkJson.class,
new LoadIdsStep(theBatch2DaoSvc))
.addLastStep(
"inflate-and-submit-resources",
"Inflate and Submit resources",
mdmInflateAndSubmitResourcesStep())
.build();
}
@Bean
public MdmSubmitJobParametersValidator mdmSubmitJobParametersValidator(MatchUrlService theMatchUrlService, FhirContext theFhirContext, IMdmSettings theMdmSettings) {
return new MdmSubmitJobParametersValidator(theMdmSettings, theMatchUrlService, theFhirContext);
}
@Bean
public MdmInflateAndSubmitResourcesStep mdmInflateAndSubmitResourcesStep() {
return new MdmInflateAndSubmitResourcesStep();
}
}

View File

@ -0,0 +1,27 @@
package ca.uhn.fhir.mdm.batch2.submit;
/*-
* #%L
* hapi-fhir-storage-mdm
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
public class MdmSubmitJobParameters extends PartitionedUrlListJobParameters {
}

View File

@ -0,0 +1,92 @@
package ca.uhn.fhir.mdm.batch2.submit;
/*-
* #%L
* hapi-fhir-storage-mdm
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.mdm.api.IMdmSettings;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
public class MdmSubmitJobParametersValidator implements IJobParametersValidator<MdmSubmitJobParameters> {
private IMdmSettings myMdmSettings;
private MatchUrlService myMatchUrlService;
private FhirContext myFhirContext;
public MdmSubmitJobParametersValidator(IMdmSettings theMdmSettings, MatchUrlService theMatchUrlService, FhirContext theFhirContext) {
myMdmSettings = theMdmSettings;
myMatchUrlService = theMatchUrlService;
myFhirContext = theFhirContext;
}
@Nonnull
@Override
public List<String> validate(@Nonnull MdmSubmitJobParameters theParameters) {
List<String> errorMsgs = new ArrayList<>();
for (PartitionedUrl partitionedUrl : theParameters.getPartitionedUrls()) {
String url = partitionedUrl.getUrl();
String resourceType = getResourceTypeFromUrl(url);
RuntimeResourceDefinition resourceDefinition = myFhirContext.getResourceDefinition(resourceType);
validateTypeIsUsedByMdm(errorMsgs, resourceType);
validateAllSearchParametersApplyToResourceType(errorMsgs, partitionedUrl, resourceType, resourceDefinition);
}
return errorMsgs;
}
private void validateAllSearchParametersApplyToResourceType(List<String> errorMsgs, PartitionedUrl partitionedUrl, String resourceType, RuntimeResourceDefinition resourceDefinition) {
try {
myMatchUrlService.translateMatchUrl(partitionedUrl.getUrl(), resourceDefinition);
} catch (MatchUrlService.UnrecognizedSearchParameterException e) {
String errorMsg = String.format("Search parameter %s is not recognized for resource type %s. Source error is %s", e.getParamName(), resourceType, e.getMessage());
errorMsgs.add(errorMsg);
} catch (InvalidRequestException e) {
errorMsgs.add("Invalid request detected: " + e.getMessage());
}
}
private void validateTypeIsUsedByMdm(List<String> errorMsgs, String resourceType) {
if (!myMdmSettings.isSupportedMdmType(resourceType)) {
errorMsgs.add("Resource type " + resourceType + " is not supported by MDM. Check your MDM settings");
}
}
private String getResourceTypeFromUrl(String url) {
int questionMarkIndex = url.indexOf('?');
String resourceType;
if (questionMarkIndex == -1) {
resourceType = url;
} else {
resourceType = url.substring(0, questionMarkIndex);
}
return resourceType;
}
}

View File

@ -0,0 +1,67 @@
package ca.uhn.fhir.mdm.batch2.submit;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.mdm.api.IMdmSettings;
import ca.uhn.fhir.mdm.rules.json.MdmRulesJson;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class MdmSubmitJobParametersValidatorTest {
@InjectMocks
MdmSubmitJobParametersValidator myValidator;
@Mock
private FhirContext myFhirContext;
@Mock
private IMdmSettings myMdmSettings;
@Mock
private MatchUrlService myMatchUrlService;
@BeforeEach
public void before() {
myFhirContext = FhirContext.forR4Cached();
myValidator = new MdmSubmitJobParametersValidator(myMdmSettings, myMatchUrlService, myFhirContext);
}
@Test
public void testUnusedMdmResourceTypesAreNotAccepted() {
when(myMdmSettings.isSupportedMdmType(anyString())).thenReturn(false);
MdmSubmitJobParameters parameters = new MdmSubmitJobParameters();
parameters.addUrl("Practitioner?name=foo");
List<String> errors = myValidator.validate(parameters);
assertThat(errors, hasSize(1));
assertThat(errors.get(0), is(equalTo("Resource type Practitioner is not supported by MDM. Check your MDM settings")));
}
@Test
public void testMissingSearchParameter() {
when(myMdmSettings.isSupportedMdmType(anyString())).thenReturn(true);
when(myMatchUrlService.translateMatchUrl(anyString(), any())).thenThrow(new InvalidRequestException("Can't find death-date!"));
MdmSubmitJobParameters parameters = new MdmSubmitJobParameters();
parameters.addUrl("Practitioner?death-date=foo");
List<String> errors = myValidator.validate(parameters);
assertThat(errors, hasSize(1));
assertThat(errors.get(0), is(equalTo("Invalid request detected: Can't find death-date!")));
}
}