From 65776bbab3d537ddff0243cc0c75ff03ab0a9cdf Mon Sep 17 00:00:00 2001 From: Ben Li-Sauerwine Date: Sun, 27 Feb 2022 11:38:20 -0500 Subject: [PATCH] Adds a NDJSON-capable Bulk Data Import Provider (#3039) * NDJsonParser with one example test. * Add test for empty NDJson, and fix bug for empty NDJson. * Adds multi-Patient test, and fixes bug whereby all multi-line NDJSON would be put into the same line. * Adds test for NDJson with newlines in it. * Adds test for converting non-Bundle types to NDJSON failing. * Confirm that we can only extract to Bundle types in test. * Update hapi-fhir-base/src/main/java/ca/uhn/fhir/context/FhirContext.java Co-authored-by: James Agnew * Documents behavior of the NDJsonParser in FhirContext. * Attempt to fix failing build by using TestUtil to clear context in the manner of r4 parser tests instead of dstu Also clean up indentation. * Adds a BulkDataImportProvider, with a single test. More tests are forthcoming. * Adds an additional test. * Enhance tests to include content of job files. Fix a bug whereby the content was never actually being added. * Adds several tests for BulkImportProvider's polling operation. * Adds requred Msg.code to errors. * Apparently I can't duplicate Msg.code even if it's the same issue. Co-authored-by: James Agnew --- .../provider/BulkDataImportProvider.java | 196 ++++++++++++ .../bulk/imprt/svc/BulkDataImportSvcImpl.java | 11 +- .../ca/uhn/fhir/jpa/config/JpaConfig.java | 6 + .../jpa/bulk/BulkDataImportProviderTest.java | 286 ++++++++++++++++++ .../uhn/fhir/jpa/model/util/JpaConstants.java | 29 ++ .../bulk/imprt/api/IBulkDataImportSvc.java | 39 +++ 6 files changed, 565 insertions(+), 2 deletions(-) create mode 100644 hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/provider/BulkDataImportProvider.java create mode 100644 hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataImportProviderTest.java diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/provider/BulkDataImportProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/provider/BulkDataImportProvider.java new file mode 100644 index 00000000000..4ff4a14a8f9 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/provider/BulkDataImportProvider.java @@ -0,0 +1,196 @@ +package ca.uhn.fhir.jpa.bulk.imprt.provider; + +/*- + * #%L + * HAPI FHIR JPA Server + * %% + * Copyright (C) 2014 - 2021 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import ca.uhn.fhir.i18n.Msg; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; +import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson; +import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson; +import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum; +import ca.uhn.fhir.jpa.model.util.JpaConstants; +import ca.uhn.fhir.parser.IParser; +import ca.uhn.fhir.rest.annotation.Operation; +import ca.uhn.fhir.rest.annotation.OperationParam; +import ca.uhn.fhir.rest.api.Constants; +import ca.uhn.fhir.rest.api.EncodingEnum; +import ca.uhn.fhir.rest.api.PreferHeader; +import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; +import ca.uhn.fhir.rest.server.RestfulServerUtils; +import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; +import ca.uhn.fhir.util.OperationOutcomeUtil; +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.input.ReaderInputStream; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HeaderElement; +import org.apache.http.NameValuePair; +import org.apache.http.message.BasicHeaderValueParser; +import org.hl7.fhir.instance.model.api.IBaseOperationOutcome; +import org.hl7.fhir.instance.model.api.IPrimitiveType; +import org.hl7.fhir.r4.model.InstantType; +import org.slf4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import javax.servlet.http.HttpServletResponse; +import static org.slf4j.LoggerFactory.getLogger; + + +public class BulkDataImportProvider { + private static final Logger ourLog = getLogger(BulkDataImportProvider.class); + + @Autowired + private IBulkDataImportSvc myBulkDataImportSvc; + + @Autowired + private FhirContext myFhirContext; + + @VisibleForTesting + public void setFhirContextForUnitTest(FhirContext theFhirContext) { + myFhirContext = theFhirContext; + } + + @VisibleForTesting + public void setBulkDataImportSvcForUnitTests(IBulkDataImportSvc theBulkDataImportSvc) { + myBulkDataImportSvc = theBulkDataImportSvc; + } + + /** + * $import + */ + @Operation(name = JpaConstants.OPERATION_IMPORT, global = false /* set to true once we can handle this */, manualResponse = true, idempotent = true, manualRequest = true) + public void imprt( + @OperationParam(name = JpaConstants.PARAM_IMPORT_JOB_DESCRIPTION, min = 0, max = 1, typeName = "string") IPrimitiveType theJobDescription, + @OperationParam(name = JpaConstants.PARAM_IMPORT_PROCESSING_MODE, min = 0, max = 1, typeName = "string") IPrimitiveType theProcessingMode, + @OperationParam(name = JpaConstants.PARAM_IMPORT_FILE_COUNT, min = 0, max = 1, typeName = "integer") IPrimitiveType theFileCount, + @OperationParam(name = JpaConstants.PARAM_IMPORT_BATCH_SIZE, min = 0, max = 1, typeName = "integer") IPrimitiveType theBatchSize, + ServletRequestDetails theRequestDetails + ) throws IOException { + validatePreferAsyncHeader(theRequestDetails); + + // Import requests are expected to be in NDJson format. + if (RestfulServerUtils.determineRequestEncodingNoDefault(theRequestDetails) != EncodingEnum.NDJSON) { + throw new InvalidRequestException(Msg.code(9001) + " An NDJson content type, like " + Constants.CT_FHIR_NDJSON.toString() + " must be provided for $import."); + } + + BulkImportJobJson theImportJobJson = new BulkImportJobJson(); + theImportJobJson.setJobDescription(theJobDescription == null ? null : theJobDescription.getValueAsString()); + + theImportJobJson.setProcessingMode(theProcessingMode == null ? JobFileRowProcessingModeEnum.FHIR_TRANSACTION : JobFileRowProcessingModeEnum.valueOf(theProcessingMode.getValueAsString())); + theImportJobJson.setBatchSize(theBatchSize == null ? 1 : theBatchSize.getValue()); + theImportJobJson.setFileCount(theFileCount == null ? 1 : theFileCount.getValue()); + + // For now, we expect theImportJobJson.getFileCount() to be 1. + // In the future, the arguments to $import can be changed to allow additional files to be attached to an existing, known job. + // Then, when the correct number of files have been attached, the job would be started automatically. + if (theImportJobJson.getFileCount() != 1) { + throw new InvalidRequestException(Msg.code(9002) + " $import requires " + JpaConstants.PARAM_IMPORT_FILE_COUNT.toString() + " to be exactly 1."); + } + + List theInitialFiles = new ArrayList(); + + BulkImportJobFileJson theJobFile = new BulkImportJobFileJson(); + theJobFile.setTenantName(theRequestDetails.getTenantId()); + if (theJobDescription != null) { + theJobFile.setDescription(theJobDescription.getValueAsString()); + } + + IParser myParser = myFhirContext.newNDJsonParser(); + + // We validate the NDJson by parsing it and then re-writing it. + // In the future, we could add a parameter to skip validation if desired. + theJobFile.setContents(myParser.encodeResourceToString(myParser.parseResource(theRequestDetails.getInputStream()))); + + theInitialFiles.add(theJobFile); + + // Start the job. + // In a future change, we could add an additional parameter to add files to an existing job. + // In that world, we would only create a new job if we weren't provided an existing job ID that is to + // be augmented. + String theJob = myBulkDataImportSvc.createNewJob(theImportJobJson, theInitialFiles); + myBulkDataImportSvc.markJobAsReadyForActivation(theJob); + writePollingLocationToResponseHeaders(theRequestDetails, theJob); + } + + /** + * $import-poll-status + */ + @Operation(name = JpaConstants.OPERATION_IMPORT_POLL_STATUS, manualResponse = true, idempotent = true) + public void importPollStatus( + @OperationParam(name = JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID, typeName = "string", min = 0, max = 1) IPrimitiveType theJobId, + ServletRequestDetails theRequestDetails + ) throws IOException { + HttpServletResponse response = theRequestDetails.getServletResponse(); + theRequestDetails.getServer().addHeadersToResponse(response); + IBulkDataImportSvc.JobInfo status = myBulkDataImportSvc.getJobStatus(theJobId.getValueAsString()); + IBaseOperationOutcome oo; + switch (status.getStatus()) { + case STAGING: + case READY: + case RUNNING: + response.setStatus(Constants.STATUS_HTTP_202_ACCEPTED); + response.addHeader(Constants.HEADER_X_PROGRESS, "Status set to " + status.getStatus() + " at " + new InstantType(status.getStatusTime()).getValueAsString()); + response.addHeader(Constants.HEADER_RETRY_AFTER, "120"); + break; + case COMPLETE: + response.setStatus(Constants.STATUS_HTTP_200_OK); + response.setContentType(Constants.CT_FHIR_JSON); + // Create an OperationOutcome response + oo = OperationOutcomeUtil.newInstance(myFhirContext); + myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToWriter(oo, response.getWriter()); + response.getWriter().close(); + break; + case ERROR: + response.setStatus(Constants.STATUS_HTTP_500_INTERNAL_ERROR); + response.setContentType(Constants.CT_FHIR_JSON); + // Create an OperationOutcome response + oo = OperationOutcomeUtil.newInstance(myFhirContext); + OperationOutcomeUtil.addIssue(myFhirContext, oo, "error", status.getStatusMessage(), null, null); + myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToWriter(oo, response.getWriter()); + response.getWriter().close(); + } + } + + public void writePollingLocationToResponseHeaders(ServletRequestDetails theRequestDetails, String theJob) { + String serverBase = getServerBase(theRequestDetails); + String pollLocation = serverBase + "/" + JpaConstants.OPERATION_IMPORT_POLL_STATUS + "?" + JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID + "=" + theJob; + HttpServletResponse response = theRequestDetails.getServletResponse(); + // Add standard headers + theRequestDetails.getServer().addHeadersToResponse(response); + // Successful 202 Accepted + response.addHeader(Constants.HEADER_CONTENT_LOCATION, pollLocation); + response.setStatus(Constants.STATUS_HTTP_202_ACCEPTED); + } + + private String getServerBase(ServletRequestDetails theRequestDetails) { + return StringUtils.removeEnd(theRequestDetails.getServerBaseForRequest(), "/"); + } + + private void validatePreferAsyncHeader(ServletRequestDetails theRequestDetails) { + String preferHeader = theRequestDetails.getHeader(Constants.HEADER_PREFER); + PreferHeader prefer = RestfulServerUtils.parsePreferHeader(null, preferHeader); + if (prefer.getRespondAsync() == false) { + throw new InvalidRequestException(Msg.code(9003) + " Must request async processing for $import"); + } + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/svc/BulkDataImportSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/svc/BulkDataImportSvcImpl.java index ac08aac51f6..54d64c97e05 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/svc/BulkDataImportSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/svc/BulkDataImportSvcImpl.java @@ -234,6 +234,15 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc { return job.toJson(); } + @Override + public JobInfo getJobStatus(String theJobId) { + BulkImportJobEntity theJob = findJobByJobId(theJobId); + return new JobInfo() + .setStatus(theJob.getStatus()) + .setStatusMessage(theJob.getStatusMessage()) + .setStatusTime(theJob.getStatusTime()); + } + @Transactional @Override public BulkImportJobFileJson fetchFile(String theJobId, int theFileIndex) { @@ -306,6 +315,4 @@ public class BulkDataImportSvcImpl implements IBulkDataImportSvc { myTarget.activateNextReadyJob(); } } - - } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/JpaConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/JpaConfig.java index 38d2da8dd65..dc43282b3cc 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/JpaConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/JpaConfig.java @@ -21,6 +21,7 @@ import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc; import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider; import ca.uhn.fhir.jpa.bulk.export.svc.BulkDataExportSvcImpl; import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; +import ca.uhn.fhir.jpa.bulk.imprt.provider.BulkDataImportProvider; import ca.uhn.fhir.jpa.bulk.imprt.svc.BulkDataImportSvcImpl; import ca.uhn.fhir.jpa.cache.IResourceVersionSvc; import ca.uhn.fhir.jpa.cache.ResourceVersionSvcDaoImpl; @@ -479,6 +480,11 @@ public class JpaConfig { return new BulkDataImportSvcImpl(); } + @Bean + @Lazy + public BulkDataImportProvider bulkDataImportProvider() { + return new BulkDataImportProvider(); + } @Bean public PersistedJpaBundleProviderFactory persistedJpaBundleProviderFactory() { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataImportProviderTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataImportProviderTest.java new file mode 100644 index 00000000000..1d80ebd8062 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/BulkDataImportProviderTest.java @@ -0,0 +1,286 @@ +package ca.uhn.fhir.jpa.bulk; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; +import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; +import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum; +import ca.uhn.fhir.jpa.bulk.imprt.provider.BulkDataImportProvider; +import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson; +import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson; +import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum; +import ca.uhn.fhir.jpa.model.util.JpaConstants; +import ca.uhn.fhir.rest.api.Constants; +import ca.uhn.fhir.rest.client.apache.ResourceEntity; +import ca.uhn.fhir.rest.server.RestfulServer; +import ca.uhn.fhir.util.UrlUtil; +import ca.uhn.fhir.test.utilities.JettyUtil; +import com.google.common.base.Charsets; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.entity.EntityBuilder; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.util.EntityUtils; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.hl7.fhir.r4.model.InstantType; +import org.hl7.fhir.r4.model.IntegerType; +import org.hl7.fhir.r4.model.StringType; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.List; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +@ExtendWith(MockitoExtension.class) +public class BulkDataImportProviderTest { + private static final String A_JOB_ID = "0000000-AAAAAA"; + private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportProviderTest.class); + private Server myServer; + private final FhirContext myCtx = FhirContext.forR4Cached(); + private int myPort; + @Mock + private IBulkDataImportSvc myBulkDataImportSvc; + @Mock + private IInterceptorBroadcaster myInterceptorBroadcaster; + private CloseableHttpClient myClient; + @Captor + private ArgumentCaptor myBulkImportJobJsonCaptor; + @Captor + private ArgumentCaptor> myBulkImportJobFileJsonCaptor; + + @AfterEach + public void after() throws Exception { + JettyUtil.closeServer(myServer); + myClient.close(); + } + + @BeforeEach + public void start() throws Exception { + myServer = new Server(0); + + BulkDataImportProvider provider = new BulkDataImportProvider(); + provider.setBulkDataImportSvcForUnitTests(myBulkDataImportSvc); + provider.setFhirContextForUnitTest(myCtx); + + ServletHandler proxyHandler = new ServletHandler(); + RestfulServer servlet = new RestfulServer(myCtx); + servlet.registerProvider(provider); + ServletHolder servletHolder = new ServletHolder(servlet); + proxyHandler.addServletWithMapping(servletHolder, "/*"); + myServer.setHandler(proxyHandler); + JettyUtil.startServer(myServer); + myPort = JettyUtil.getPortForStartedServer(myServer); + + PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(5000, TimeUnit.MILLISECONDS); + HttpClientBuilder builder = HttpClientBuilder.create(); + builder.setConnectionManager(connectionManager); + myClient = builder.build(); + } + + @Test + public void testSuccessfulInitiateBulkRequest_Post() throws IOException { + when(myBulkDataImportSvc.createNewJob(any(), any())).thenReturn(A_JOB_ID); + + HttpPost post = new HttpPost("http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT + + "?" + JpaConstants.PARAM_IMPORT_JOB_DESCRIPTION + "=" + UrlUtil.escapeUrlParam("My Import Job") + + "&" + JpaConstants.PARAM_IMPORT_BATCH_SIZE + "=" + UrlUtil.escapeUrlParam("100")); + + post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC); + post.setEntity( + EntityBuilder.create() + .setContentType(ContentType.create(Constants.CT_FHIR_NDJSON)) + .setText("{\"resourceType\":\"Patient\",\"id\":\"Pat1\"}\n" + + "{\"resourceType\":\"Patient\",\"id\":\"Pat2\"}\n") + .build()); + + ourLog.info("Request: {}", post); + try (CloseableHttpResponse response = myClient.execute(post)) { + ourLog.info("Response: {}", EntityUtils.toString(response.getEntity())); + assertEquals(202, response.getStatusLine().getStatusCode()); + assertEquals("Accepted", response.getStatusLine().getReasonPhrase()); + assertEquals("http://localhost:" + myPort + "/$import-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue()); + } + + verify(myBulkDataImportSvc, times(1)).createNewJob(myBulkImportJobJsonCaptor.capture(), myBulkImportJobFileJsonCaptor.capture()); + BulkImportJobJson options = myBulkImportJobJsonCaptor.getValue(); + assertEquals(1, options.getFileCount()); + assertEquals(100, options.getBatchSize()); + assertEquals(JobFileRowProcessingModeEnum.FHIR_TRANSACTION, options.getProcessingMode()); + assertEquals("My Import Job", options.getJobDescription()); + List jobs = myBulkImportJobFileJsonCaptor.getValue(); + assertEquals(1, jobs.size()); + assertThat(jobs.get(0).getContents(), containsString("Pat1")); + } + + @Test + public void testSuccessfulInitiateBulkRequest_Post_AllParameters() throws IOException { + when(myBulkDataImportSvc.createNewJob(any(), any())).thenReturn(A_JOB_ID); + + HttpPost post = new HttpPost("http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT + + "?" + JpaConstants.PARAM_IMPORT_JOB_DESCRIPTION + "=" + UrlUtil.escapeUrlParam("My Import Job") + + "&" + JpaConstants.PARAM_IMPORT_PROCESSING_MODE + "=" + UrlUtil.escapeUrlParam(JobFileRowProcessingModeEnum.FHIR_TRANSACTION.toString()) + + "&" + JpaConstants.PARAM_IMPORT_BATCH_SIZE + "=" + UrlUtil.escapeUrlParam("100") + + "&" + JpaConstants.PARAM_IMPORT_FILE_COUNT + "=" + UrlUtil.escapeUrlParam("1")); + + post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC); + post.setEntity( + EntityBuilder.create() + .setContentType(ContentType.create(Constants.CT_FHIR_NDJSON)) + .setText("{\"resourceType\":\"Patient\",\"id\":\"Pat1\"}\n" + + "{\"resourceType\":\"Patient\",\"id\":\"Pat2\"}\n") + .build()); + + ourLog.info("Request: {}", post); + try (CloseableHttpResponse response = myClient.execute(post)) { + ourLog.info("Response: {}", EntityUtils.toString(response.getEntity())); + assertEquals(202, response.getStatusLine().getStatusCode()); + assertEquals("Accepted", response.getStatusLine().getReasonPhrase()); + assertEquals("http://localhost:" + myPort + "/$import-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue()); + } + + verify(myBulkDataImportSvc, times(1)).createNewJob(myBulkImportJobJsonCaptor.capture(), myBulkImportJobFileJsonCaptor.capture()); + BulkImportJobJson options = myBulkImportJobJsonCaptor.getValue(); + assertEquals(1, options.getFileCount()); + assertEquals(100, options.getBatchSize()); + assertEquals(JobFileRowProcessingModeEnum.FHIR_TRANSACTION, options.getProcessingMode()); + assertEquals("My Import Job", options.getJobDescription()); + List jobs = myBulkImportJobFileJsonCaptor.getValue(); + assertEquals(1, jobs.size()); + assertThat(jobs.get(0).getContents(), containsString("Pat1")); + } + + @Test + public void testPollForStatus_STAGING() throws IOException { + + IBulkDataImportSvc.JobInfo jobInfo = new IBulkDataImportSvc.JobInfo() + .setStatus(BulkImportJobStatusEnum.STAGING) + .setStatusTime(InstantType.now().getValue()); + when(myBulkDataImportSvc.getJobStatus(eq(A_JOB_ID))).thenReturn(jobInfo); + + String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT_POLL_STATUS + "?" + + JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID; + HttpGet get = new HttpGet(url); + get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC); + try (CloseableHttpResponse response = myClient.execute(get)) { + ourLog.info("Response: {}", response.toString()); + + assertEquals(202, response.getStatusLine().getStatusCode()); + assertEquals("Accepted", response.getStatusLine().getReasonPhrase()); + assertEquals("120", response.getFirstHeader(Constants.HEADER_RETRY_AFTER).getValue()); + assertThat(response.getFirstHeader(Constants.HEADER_X_PROGRESS).getValue(), containsString("Status set to STAGING at 20")); + } + } + + @Test + public void testPollForStatus_READY() throws IOException { + + IBulkDataImportSvc.JobInfo jobInfo = new IBulkDataImportSvc.JobInfo() + .setStatus(BulkImportJobStatusEnum.READY) + .setStatusTime(InstantType.now().getValue()); + when(myBulkDataImportSvc.getJobStatus(eq(A_JOB_ID))).thenReturn(jobInfo); + + String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT_POLL_STATUS + "?" + + JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID; + HttpGet get = new HttpGet(url); + get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC); + try (CloseableHttpResponse response = myClient.execute(get)) { + ourLog.info("Response: {}", response.toString()); + + assertEquals(202, response.getStatusLine().getStatusCode()); + assertEquals("Accepted", response.getStatusLine().getReasonPhrase()); + assertEquals("120", response.getFirstHeader(Constants.HEADER_RETRY_AFTER).getValue()); + assertThat(response.getFirstHeader(Constants.HEADER_X_PROGRESS).getValue(), containsString("Status set to READY at 20")); + } + } + + @Test + public void testPollForStatus_RUNNING() throws IOException { + + IBulkDataImportSvc.JobInfo jobInfo = new IBulkDataImportSvc.JobInfo() + .setStatus(BulkImportJobStatusEnum.RUNNING) + .setStatusTime(InstantType.now().getValue()); + when(myBulkDataImportSvc.getJobStatus(eq(A_JOB_ID))).thenReturn(jobInfo); + + String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT_POLL_STATUS + "?" + + JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID; + HttpGet get = new HttpGet(url); + get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC); + try (CloseableHttpResponse response = myClient.execute(get)) { + ourLog.info("Response: {}", response.toString()); + + assertEquals(202, response.getStatusLine().getStatusCode()); + assertEquals("Accepted", response.getStatusLine().getReasonPhrase()); + assertEquals("120", response.getFirstHeader(Constants.HEADER_RETRY_AFTER).getValue()); + assertThat(response.getFirstHeader(Constants.HEADER_X_PROGRESS).getValue(), containsString("Status set to RUNNING at 20")); + } + } + + @Test + public void testPollForStatus_COMPLETE() throws IOException { + IBulkDataImportSvc.JobInfo jobInfo = new IBulkDataImportSvc.JobInfo() + .setStatus(BulkImportJobStatusEnum.COMPLETE) + .setStatusTime(InstantType.now().getValue()); + when(myBulkDataImportSvc.getJobStatus(eq(A_JOB_ID))).thenReturn(jobInfo); + + String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT_POLL_STATUS + "?" + + JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID; + HttpGet get = new HttpGet(url); + get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC); + try (CloseableHttpResponse response = myClient.execute(get)) { + ourLog.info("Response: {}", response.toString()); + + assertEquals(200, response.getStatusLine().getStatusCode()); + assertEquals("OK", response.getStatusLine().getReasonPhrase()); + assertThat(response.getEntity().getContentType().getValue(), containsString(Constants.CT_FHIR_JSON)); + } + } + + @Test + public void testPollForStatus_ERROR() throws IOException { + IBulkDataImportSvc.JobInfo jobInfo = new IBulkDataImportSvc.JobInfo() + .setStatus(BulkImportJobStatusEnum.ERROR) + .setStatusMessage("It failed.") + .setStatusTime(InstantType.now().getValue()); + when(myBulkDataImportSvc.getJobStatus(eq(A_JOB_ID))).thenReturn(jobInfo); + + String url = "http://localhost:" + myPort + "/" + JpaConstants.OPERATION_IMPORT_POLL_STATUS + "?" + + JpaConstants.PARAM_IMPORT_POLL_STATUS_JOB_ID + "=" + A_JOB_ID; + HttpGet get = new HttpGet(url); + get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC); + try (CloseableHttpResponse response = myClient.execute(get)) { + ourLog.info("Response: {}", response.toString()); + + assertEquals(500, response.getStatusLine().getStatusCode()); + assertEquals("Server Error", response.getStatusLine().getReasonPhrase()); + String responseContent = IOUtils.toString(response.getEntity().getContent(), Charsets.UTF_8); + ourLog.info("Response content: {}", responseContent); + assertThat(responseContent, containsString("\"diagnostics\": \"It failed.\"")); + } + } +} diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java index bc24cefe38c..5db91809e44 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java @@ -158,6 +158,14 @@ public class JpaConstants { * Operation name for the "$export-poll-status" operation */ public static final String OPERATION_EXPORT_POLL_STATUS = "$export-poll-status"; + /** + * Operation name for the "$import" operation + */ + public static final String OPERATION_IMPORT = "$import"; + /** + * Operation name for the "$import-poll-status" operation + */ + public static final String OPERATION_IMPORT_POLL_STATUS = "$import-poll-status"; /** * Operation name for the "$lastn" operation */ @@ -184,6 +192,27 @@ public class JpaConstants { */ public static final String PARAM_EXPORT_TYPE_FILTER = "_typeFilter"; + /** + * Parameter for the $import operation + */ + public static final String PARAM_IMPORT_POLL_STATUS_JOB_ID = "_jobId"; + /** + * Parameter for the $import operation + */ + public static final String PARAM_IMPORT_JOB_DESCRIPTION = "_jobDescription"; + /** + * Parameter for the $import operation + */ + public static final String PARAM_IMPORT_PROCESSING_MODE = "_processingMode"; + /** + * Parameter for the $import operation + */ + public static final String PARAM_IMPORT_FILE_COUNT = "_fileCount"; + /** + * Parameter for the $import operation + */ + public static final String PARAM_IMPORT_BATCH_SIZE = "_batchSize"; + /** * The [id] of the group when $export is called on /Group/[id]/$export */ diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/api/IBulkDataImportSvc.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/api/IBulkDataImportSvc.java index 7f1dc193c0d..b0c9ee0b0a9 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/api/IBulkDataImportSvc.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/imprt/api/IBulkDataImportSvc.java @@ -25,10 +25,44 @@ import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson; import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum; import javax.annotation.Nonnull; +import java.util.Date; import java.util.List; public interface IBulkDataImportSvc { + class JobInfo { + private BulkImportJobStatusEnum myStatus; + private Date myStatusTime; + private String myStatusMessage; + + public Date getStatusTime() { + return myStatusTime; + } + + public JobInfo setStatusTime(Date theStatusTime) { + myStatusTime = theStatusTime; + return this; + } + + public BulkImportJobStatusEnum getStatus() { + return myStatus; + } + + public JobInfo setStatus(BulkImportJobStatusEnum theStatus) { + myStatus = theStatus; + return this; + } + + public String getStatusMessage() { + return myStatusMessage; + } + + public JobInfo setStatusMessage(String theStatusMessage) { + myStatusMessage = theStatusMessage; + return this; + } + } + /** * Create a new job in {@link ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum#STAGING STAGING} state (meaning it won't yet be worked on and can be added to) */ @@ -69,6 +103,11 @@ public interface IBulkDataImportSvc { */ void setJobToStatus(String theJobId, BulkImportJobStatusEnum theStatus, String theStatusMessage); + /** + * Gets the job status for the given job. + */ + JobInfo getJobStatus(String theJobId); + /** * Gets the number of files available for a given Job ID *