Adds a NDJSON-capable Bulk Data Import Provider (#3039)

* NDJsonParser with one example test.

* Add test for empty NDJson, and fix bug for empty NDJson.

* Adds multi-Patient test, and fixes bug whereby all multi-line NDJSON would be put into the same line.

* Adds test for NDJson with newlines in it.

* Adds test for converting non-Bundle types to NDJSON failing.

* Confirm that we can only extract to Bundle types in test.

* Update hapi-fhir-base/src/main/java/ca/uhn/fhir/context/FhirContext.java

Co-authored-by: James Agnew <jamesagnew@gmail.com>

* Documents behavior of the NDJsonParser in FhirContext.

* Attempt to fix failing build by using TestUtil to clear context in the manner of r4 parser tests instead of dstu

Also clean up indentation.

* Adds a BulkDataImportProvider, with a single test.  More tests are forthcoming.

* Adds an additional test.

* Enhance tests to include content of job files.  Fix a bug whereby the content was never actually being added.

* Adds several tests for BulkImportProvider's polling operation.

* Adds requred Msg.code to errors.

* Apparently I can't duplicate Msg.code even if it's the same issue.

Co-authored-by: James Agnew <jamesagnew@gmail.com>
This commit is contained in:
Ben Li-Sauerwine 2022-02-27 11:38:20 -05:00 committed by GitHub
parent c7be674b84
commit 65776bbab3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 565 additions and 2 deletions

View File

@ -0,0 +1,196 @@
package ca.uhn.fhir.jpa.bulk.imprt.provider;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.PreferHeader;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.RestfulServerUtils;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.util.OperationOutcomeUtil;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.input.ReaderInputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HeaderElement;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicHeaderValueParser;
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.InstantType;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
import static org.slf4j.LoggerFactory.getLogger;
public class BulkDataImportProvider {
private static final Logger ourLog = getLogger(BulkDataImportProvider.class);
@Autowired
private IBulkDataImportSvc myBulkDataImportSvc;
@Autowired
private FhirContext myFhirContext;
@VisibleForTesting
public void setFhirContextForUnitTest(FhirContext theFhirContext) {
myFhirContext = theFhirContext;
}
@VisibleForTesting
public void setBulkDataImportSvcForUnitTests(IBulkDataImportSvc theBulkDataImportSvc) {
myBulkDataImportSvc = theBulkDataImportSvc;
}
/**
* $import
*/
@Operation(name = JpaConstants.OPERATION_IMPORT, global = false /* set to true once we can handle this */, manualResponse = true, idempotent = true, manualRequest = true)
public void imprt(
@OperationParam(name = JpaConstants.PARAM_IMPORT_JOB_DESCRIPTION, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theJobDescription,
@OperationParam(name = JpaConstants.PARAM_IMPORT_PROCESSING_MODE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theProcessingMode,
@OperationParam(name = JpaConstants.PARAM_IMPORT_FILE_COUNT, min = 0, max = 1, typeName = "integer") IPrimitiveType<Integer> theFileCount,
@OperationParam(name = JpaConstants.PARAM_IMPORT_BATCH_SIZE, min = 0, max = 1, typeName = "integer") IPrimitiveType<Integer> theBatchSize,
ServletRequestDetails theRequestDetails
) throws IOException {
validatePreferAsyncHeader(theRequestDetails);
// Import requests are expected to be in NDJson format.
if (RestfulServerUtils.determineRequestEncodingNoDefault(theRequestDetails) != EncodingEnum.NDJSON) {
throw new InvalidRequestException(Msg.code(9001) + " An NDJson content type, like " + Constants.CT_FHIR_NDJSON.toString() + " must be provided for $import.");
}
BulkImportJobJson theImportJobJson = new BulkImportJobJson();
theImportJobJson.setJobDescription(theJobDescription == null ? null : theJobDescription.getValueAsString());
theImportJobJson.setProcessingMode(theProcessingMode == null ? JobFileRowProcessingModeEnum.FHIR_TRANSACTION : JobFileRowProcessingModeEnum.valueOf(theProcessingMode.getValueAsString()));
theImportJobJson.setBatchSize(theBatchSize == null ? 1 : theBatchSize.getValue());
theImportJobJson.setFileCount(theFileCount == null ? 1 : theFileCount.getValue());
// For now, we expect theImportJobJson.getFileCount() to be 1.
// In the future, the arguments to $import can be changed to allow additional files to be attached to an existing, known job.
// Then, when the correct number of files have been attached, the job would be started automatically.
if (theImportJobJson.getFileCount() != 1) {
throw new InvalidRequestException(Msg.code(9002) + " $import requires " + JpaConstants.PARAM_IMPORT_FILE_COUNT.toString() + " to be exactly 1.");
}
List<BulkImportJobFileJson> theInitialFiles = new ArrayList<BulkImportJobFileJson>();
BulkImportJobFileJson theJobFile = new BulkImportJobFileJson();
theJobFile.setTenantName(theRequestDetails.getTenantId());
if (theJobDescription != null) {
theJobFile.setDescription(theJobDescription.getValueAsString());
}
IParser myParser = myFhirContext.newNDJsonParser();
// We validate the NDJson by parsing it and then re-writing it.
// In the future, we could add a parameter to skip validation if desired.
theJobFile.setContents(myParser.encodeResourceToString(myParser.parseResource(theRequestDetails.getInputStream())));
theInitialFiles.add(theJobFile);
// Start the job.
// In a future change, we could add an additional parameter to add files to an existing job.
// In that world, we would only create a new job if we weren't provided an existing job ID that is to
// be augmented.
String theJob = myBulkDataImportSvc.createNewJob(theImportJobJson, theInitialFiles);
myBulkDataImportSvc.markJobAsReadyForActivation(theJob);
writePollingLocationToResponseHeaders(theRequestDetails, theJob);
}
/**
* $import-poll-status
*/
@Operation(name = JpaConstants.OPERATION_IMPORT_POLL_STATUS, manualResponse = true, idempotent = true)
public void importPollStatus(
@OperationParam(name = JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID, typeName = "string", min = 0, max = 1) IPrimitiveType<String> theJobId,
ServletRequestDetails theRequestDetails
) throws IOException {
HttpServletResponse response = theRequestDetails.getServletResponse();
theRequestDetails.getServer().addHeadersToResponse(response);
IBulkDataImportSvc.JobInfo status = myBulkDataImportSvc.getJobStatus(theJobId.getValueAsString());
IBaseOperationOutcome oo;
switch (status.getStatus()) {
case STAGING:
case READY:
case RUNNING:
response.setStatus(Constants.STATUS_HTTP_202_ACCEPTED);
response.addHeader(Constants.HEADER_X_PROGRESS, "Status set to " + status.getStatus() + " at " + new InstantType(status.getStatusTime()).getValueAsString());
response.addHeader(Constants.HEADER_RETRY_AFTER, "120");
break;
case COMPLETE:
response.setStatus(Constants.STATUS_HTTP_200_OK);
response.setContentType(Constants.CT_FHIR_JSON);
// Create an OperationOutcome response
oo = OperationOutcomeUtil.newInstance(myFhirContext);
myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToWriter(oo, response.getWriter());
response.getWriter().close();
break;
case ERROR:
response.setStatus(Constants.STATUS_HTTP_500_INTERNAL_ERROR);
response.setContentType(Constants.CT_FHIR_JSON);
// Create an OperationOutcome response
oo = OperationOutcomeUtil.newInstance(myFhirContext);
OperationOutcomeUtil.addIssue(myFhirContext, oo, "error", status.getStatusMessage(), null, null);
myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToWriter(oo, response.getWriter());
response.getWriter().close();
}
}
public void writePollingLocationToResponseHeaders(ServletRequestDetails theRequestDetails, String theJob) {
String serverBase = getServerBase(theRequestDetails);
String pollLocation = serverBase + "/" + JpaConstants.OPERATION_IMPORT_POLL_STATUS + "?" + JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID + "=" + theJob;
HttpServletResponse response = theRequestDetails.getServletResponse();
// Add standard headers
theRequestDetails.getServer().addHeadersToResponse(response);
// Successful 202 Accepted
response.addHeader(Constants.HEADER_CONTENT_LOCATION, pollLocation);
response.setStatus(Constants.STATUS_HTTP_202_ACCEPTED);
}
private String getServerBase(ServletRequestDetails theRequestDetails) {
return StringUtils.removeEnd(theRequestDetails.getServerBaseForRequest(), "/");
}
private void validatePreferAsyncHeader(ServletRequestDetails theRequestDetails) {
String preferHeader = theRequestDetails.getHeader(Constants.HEADER_PREFER);
PreferHeader prefer = RestfulServerUtils.parsePreferHeader(null, preferHeader);
if (prefer.getRespondAsync() == false) {
throw new InvalidRequestException(Msg.code(9003) + " Must request async processing for $import");
}
}
}

View File

@ -234,6 +234,15 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
return job.toJson(); return job.toJson();
} }
@Override
public JobInfo getJobStatus(String theJobId) {
BulkImportJobEntity theJob = findJobByJobId(theJobId);
return new JobInfo()
.setStatus(theJob.getStatus())
.setStatusMessage(theJob.getStatusMessage())
.setStatusTime(theJob.getStatusTime());
}
@Transactional @Transactional
@Override @Override
public BulkImportJobFileJson fetchFile(String theJobId, int theFileIndex) { public BulkImportJobFileJson fetchFile(String theJobId, int theFileIndex) {
@ -306,6 +315,4 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
myTarget.activateNextReadyJob(); myTarget.activateNextReadyJob();
} }
} }
} }

View File

@ -21,6 +21,7 @@ import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider; import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.bulk.export.svc.BulkDataExportSvcImpl; import ca.uhn.fhir.jpa.bulk.export.svc.BulkDataExportSvcImpl;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.provider.BulkDataImportProvider;
import ca.uhn.fhir.jpa.bulk.imprt.svc.BulkDataImportSvcImpl; import ca.uhn.fhir.jpa.bulk.imprt.svc.BulkDataImportSvcImpl;
import ca.uhn.fhir.jpa.cache.IResourceVersionSvc; import ca.uhn.fhir.jpa.cache.IResourceVersionSvc;
import ca.uhn.fhir.jpa.cache.ResourceVersionSvcDaoImpl; import ca.uhn.fhir.jpa.cache.ResourceVersionSvcDaoImpl;
@ -479,6 +480,11 @@ public class JpaConfig {
return new BulkDataImportSvcImpl(); return new BulkDataImportSvcImpl();
} }
@Bean
@Lazy
public BulkDataImportProvider bulkDataImportProvider() {
return new BulkDataImportProvider();
}
@Bean @Bean
public PersistedJpaBundleProviderFactory persistedJpaBundleProviderFactory() { public PersistedJpaBundleProviderFactory persistedJpaBundleProviderFactory() {

View File

@ -0,0 +1,286 @@
package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.imprt.provider.BulkDataImportProvider;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.client.apache.ResourceEntity;
import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.util.UrlUtil;
import ca.uhn.fhir.test.utilities.JettyUtil;
import com.google.common.base.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.entity.EntityBuilder;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.IntegerType;
import org.hl7.fhir.r4.model.StringType;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class BulkDataImportProviderTest {
private static final String A_JOB_ID = "0000000-AAAAAA";
private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportProviderTest.class);
private Server myServer;
private final FhirContext myCtx = FhirContext.forR4Cached();
private int myPort;
@Mock
private IBulkDataImportSvc myBulkDataImportSvc;
@Mock
private IInterceptorBroadcaster myInterceptorBroadcaster;
private CloseableHttpClient myClient;
@Captor
private ArgumentCaptor<BulkImportJobJson> myBulkImportJobJsonCaptor;
@Captor
private ArgumentCaptor<List<BulkImportJobFileJson>> myBulkImportJobFileJsonCaptor;
@AfterEach
public void after() throws Exception {
JettyUtil.closeServer(myServer);
myClient.close();
}
@BeforeEach
public void start() throws Exception {
myServer = new Server(0);
BulkDataImportProvider provider = new BulkDataImportProvider();
provider.setBulkDataImportSvcForUnitTests(myBulkDataImportSvc);
provider.setFhirContextForUnitTest(myCtx);
ServletHandler proxyHandler = new ServletHandler();
RestfulServer servlet = new RestfulServer(myCtx);
servlet.registerProvider(provider);
ServletHolder servletHolder = new ServletHolder(servlet);
proxyHandler.addServletWithMapping(servletHolder, "/*");
myServer.setHandler(proxyHandler);
JettyUtil.startServer(myServer);
myPort = JettyUtil.getPortForStartedServer(myServer);
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(5000, TimeUnit.MILLISECONDS);
HttpClientBuilder builder = HttpClientBuilder.create();
builder.setConnectionManager(connectionManager);
myClient = builder.build();
}
@Test
public void testSuccessfulInitiateBulkRequest_Post() throws IOException {
when(myBulkDataImportSvc.createNewJob(any(), any())).thenReturn(A_JOB_ID);
HttpPost post = new HttpPost("http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT +
"?" + JpaConstants.PARAM_IMPORT_JOB_DESCRIPTION + "=" + UrlUtil.escapeUrlParam("My Import Job") +
"&" + JpaConstants.PARAM_IMPORT_BATCH_SIZE + "=" + UrlUtil.escapeUrlParam("100"));
post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
post.setEntity(
EntityBuilder.create()
.setContentType(ContentType.create(Constants.CT_FHIR_NDJSON))
.setText("{\"resourceType\":\"Patient\",\"id\":\"Pat1\"}\n" +
"{\"resourceType\":\"Patient\",\"id\":\"Pat2\"}\n")
.build());
ourLog.info("Request: {}", post);
try (CloseableHttpResponse response = myClient.execute(post)) {
ourLog.info("Response: {}", EntityUtils.toString(response.getEntity()));
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("http://localhost:" + myPort + "/$import-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataImportSvc, times(1)).createNewJob(myBulkImportJobJsonCaptor.capture(), myBulkImportJobFileJsonCaptor.capture());
BulkImportJobJson options = myBulkImportJobJsonCaptor.getValue();
assertEquals(1, options.getFileCount());
assertEquals(100, options.getBatchSize());
assertEquals(JobFileRowProcessingModeEnum.FHIR_TRANSACTION, options.getProcessingMode());
assertEquals("My Import Job", options.getJobDescription());
List<BulkImportJobFileJson> jobs = myBulkImportJobFileJsonCaptor.getValue();
assertEquals(1, jobs.size());
assertThat(jobs.get(0).getContents(), containsString("Pat1"));
}
@Test
public void testSuccessfulInitiateBulkRequest_Post_AllParameters() throws IOException {
when(myBulkDataImportSvc.createNewJob(any(), any())).thenReturn(A_JOB_ID);
HttpPost post = new HttpPost("http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT +
"?" + JpaConstants.PARAM_IMPORT_JOB_DESCRIPTION + "=" + UrlUtil.escapeUrlParam("My Import Job") +
"&" + JpaConstants.PARAM_IMPORT_PROCESSING_MODE + "=" + UrlUtil.escapeUrlParam(JobFileRowProcessingModeEnum.FHIR_TRANSACTION.toString()) +
"&" + JpaConstants.PARAM_IMPORT_BATCH_SIZE + "=" + UrlUtil.escapeUrlParam("100") +
"&" + JpaConstants.PARAM_IMPORT_FILE_COUNT + "=" + UrlUtil.escapeUrlParam("1"));
post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
post.setEntity(
EntityBuilder.create()
.setContentType(ContentType.create(Constants.CT_FHIR_NDJSON))
.setText("{\"resourceType\":\"Patient\",\"id\":\"Pat1\"}\n" +
"{\"resourceType\":\"Patient\",\"id\":\"Pat2\"}\n")
.build());
ourLog.info("Request: {}", post);
try (CloseableHttpResponse response = myClient.execute(post)) {
ourLog.info("Response: {}", EntityUtils.toString(response.getEntity()));
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("http://localhost:" + myPort + "/$import-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
}
verify(myBulkDataImportSvc, times(1)).createNewJob(myBulkImportJobJsonCaptor.capture(), myBulkImportJobFileJsonCaptor.capture());
BulkImportJobJson options = myBulkImportJobJsonCaptor.getValue();
assertEquals(1, options.getFileCount());
assertEquals(100, options.getBatchSize());
assertEquals(JobFileRowProcessingModeEnum.FHIR_TRANSACTION, options.getProcessingMode());
assertEquals("My Import Job", options.getJobDescription());
List<BulkImportJobFileJson> jobs = myBulkImportJobFileJsonCaptor.getValue();
assertEquals(1, jobs.size());
assertThat(jobs.get(0).getContents(), containsString("Pat1"));
}
@Test
public void testPollForStatus_STAGING() throws IOException {
IBulkDataImportSvc.JobInfo jobInfo = new IBulkDataImportSvc.JobInfo()
.setStatus(BulkImportJobStatusEnum.STAGING)
.setStatusTime(InstantType.now().getValue());
when(myBulkDataImportSvc.getJobStatus(eq(A_JOB_ID))).thenReturn(jobInfo);
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
try (CloseableHttpResponse response = myClient.execute(get)) {
ourLog.info("Response: {}", response.toString());
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("120", response.getFirstHeader(Constants.HEADER_RETRY_AFTER).getValue());
assertThat(response.getFirstHeader(Constants.HEADER_X_PROGRESS).getValue(), containsString("Status set to STAGING at 20"));
}
}
@Test
public void testPollForStatus_READY() throws IOException {
IBulkDataImportSvc.JobInfo jobInfo = new IBulkDataImportSvc.JobInfo()
.setStatus(BulkImportJobStatusEnum.READY)
.setStatusTime(InstantType.now().getValue());
when(myBulkDataImportSvc.getJobStatus(eq(A_JOB_ID))).thenReturn(jobInfo);
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
try (CloseableHttpResponse response = myClient.execute(get)) {
ourLog.info("Response: {}", response.toString());
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("120", response.getFirstHeader(Constants.HEADER_RETRY_AFTER).getValue());
assertThat(response.getFirstHeader(Constants.HEADER_X_PROGRESS).getValue(), containsString("Status set to READY at 20"));
}
}
@Test
public void testPollForStatus_RUNNING() throws IOException {
IBulkDataImportSvc.JobInfo jobInfo = new IBulkDataImportSvc.JobInfo()
.setStatus(BulkImportJobStatusEnum.RUNNING)
.setStatusTime(InstantType.now().getValue());
when(myBulkDataImportSvc.getJobStatus(eq(A_JOB_ID))).thenReturn(jobInfo);
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
try (CloseableHttpResponse response = myClient.execute(get)) {
ourLog.info("Response: {}", response.toString());
assertEquals(202, response.getStatusLine().getStatusCode());
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
assertEquals("120", response.getFirstHeader(Constants.HEADER_RETRY_AFTER).getValue());
assertThat(response.getFirstHeader(Constants.HEADER_X_PROGRESS).getValue(), containsString("Status set to RUNNING at 20"));
}
}
@Test
public void testPollForStatus_COMPLETE() throws IOException {
IBulkDataImportSvc.JobInfo jobInfo = new IBulkDataImportSvc.JobInfo()
.setStatus(BulkImportJobStatusEnum.COMPLETE)
.setStatusTime(InstantType.now().getValue());
when(myBulkDataImportSvc.getJobStatus(eq(A_JOB_ID))).thenReturn(jobInfo);
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
try (CloseableHttpResponse response = myClient.execute(get)) {
ourLog.info("Response: {}", response.toString());
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals("OK", response.getStatusLine().getReasonPhrase());
assertThat(response.getEntity().getContentType().getValue(), containsString(Constants.CT_FHIR_JSON));
}
}
@Test
public void testPollForStatus_ERROR() throws IOException {
IBulkDataImportSvc.JobInfo jobInfo = new IBulkDataImportSvc.JobInfo()
.setStatus(BulkImportJobStatusEnum.ERROR)
.setStatusMessage("It failed.")
.setStatusTime(InstantType.now().getValue());
when(myBulkDataImportSvc.getJobStatus(eq(A_JOB_ID))).thenReturn(jobInfo);
String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT_POLL_STATUS + "?" +
JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID;
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
try (CloseableHttpResponse response = myClient.execute(get)) {
ourLog.info("Response: {}", response.toString());
assertEquals(500, response.getStatusLine().getStatusCode());
assertEquals("Server Error", response.getStatusLine().getReasonPhrase());
String responseContent = IOUtils.toString(response.getEntity().getContent(), Charsets.UTF_8);
ourLog.info("Response content: {}", responseContent);
assertThat(responseContent, containsString("\"diagnostics\": \"It failed.\""));
}
}
}

View File

@ -158,6 +158,14 @@ public class JpaConstants {
* Operation name for the "$export-poll-status" operation * Operation name for the "$export-poll-status" operation
*/ */
public static final String OPERATION_EXPORT_POLL_STATUS = "$export-poll-status"; public static final String OPERATION_EXPORT_POLL_STATUS = "$export-poll-status";
/**
* Operation name for the "$import" operation
*/
public static final String OPERATION_IMPORT = "$import";
/**
* Operation name for the "$import-poll-status" operation
*/
public static final String OPERATION_IMPORT_POLL_STATUS = "$import-poll-status";
/** /**
* Operation name for the "$lastn" operation * Operation name for the "$lastn" operation
*/ */
@ -184,6 +192,27 @@ public class JpaConstants {
*/ */
public static final String PARAM_EXPORT_TYPE_FILTER = "_typeFilter"; public static final String PARAM_EXPORT_TYPE_FILTER = "_typeFilter";
/**
* Parameter for the $import operation
*/
public static final String PARAM_IMPORT_POLL_STATUS_JOB_ID = "_jobId";
/**
* Parameter for the $import operation
*/
public static final String PARAM_IMPORT_JOB_DESCRIPTION = "_jobDescription";
/**
* Parameter for the $import operation
*/
public static final String PARAM_IMPORT_PROCESSING_MODE = "_processingMode";
/**
* Parameter for the $import operation
*/
public static final String PARAM_IMPORT_FILE_COUNT = "_fileCount";
/**
* Parameter for the $import operation
*/
public static final String PARAM_IMPORT_BATCH_SIZE = "_batchSize";
/** /**
* The [id] of the group when $export is called on /Group/[id]/$export * The [id] of the group when $export is called on /Group/[id]/$export
*/ */

View File

@ -25,10 +25,44 @@ import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum; import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.Date;
import java.util.List; import java.util.List;
public interface IBulkDataImportSvc { public interface IBulkDataImportSvc {
class JobInfo {
private BulkImportJobStatusEnum myStatus;
private Date myStatusTime;
private String myStatusMessage;
public Date getStatusTime() {
return myStatusTime;
}
public JobInfo setStatusTime(Date theStatusTime) {
myStatusTime = theStatusTime;
return this;
}
public BulkImportJobStatusEnum getStatus() {
return myStatus;
}
public JobInfo setStatus(BulkImportJobStatusEnum theStatus) {
myStatus = theStatus;
return this;
}
public String getStatusMessage() {
return myStatusMessage;
}
public JobInfo setStatusMessage(String theStatusMessage) {
myStatusMessage = theStatusMessage;
return this;
}
}
/** /**
* Create a new job in {@link ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum#STAGING STAGING} state (meaning it won't yet be worked on and can be added to) * Create a new job in {@link ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum#STAGING STAGING} state (meaning it won't yet be worked on and can be added to)
*/ */
@ -69,6 +103,11 @@ public interface IBulkDataImportSvc {
*/ */
void setJobToStatus(String theJobId, BulkImportJobStatusEnum theStatus, String theStatusMessage); void setJobToStatus(String theJobId, BulkImportJobStatusEnum theStatus, String theStatusMessage);
/**
* Gets the job status for the given job.
*/
JobInfo getJobStatus(String theJobId);
/** /**
* Gets the number of files available for a given Job ID * Gets the number of files available for a given Job ID
* *