diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java index 49012c1c797..8ab1da3a0cf 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobConfig.java @@ -46,6 +46,10 @@ import java.util.List; */ @Configuration public class BulkExportJobConfig { + public static final String JOB_UUID_PARAMETER = "jobUUID"; + public static final String READ_CHUNK_PARAMETER = "readChunkSize"; + public static final String GROUP_ID_PARAMETER = "groupId"; + public static final String RESOURCE_TYPES_PARAMETER = "resourceTypes"; @Autowired private StepBuilderFactory myStepBuilderFactory; @@ -77,6 +81,7 @@ public class BulkExportJobConfig { public Job groupBulkExportJob() { return myJobBuilderFactory.get("groupBulkExportJob") .validator(groupBulkJobParameterValidator()) + .validator(bulkJobParameterValidator()) .start(createBulkExportEntityStep()) .next(groupPartitionStep()) .next(closeJobStep()) @@ -84,9 +89,8 @@ public class BulkExportJobConfig { } @Bean - public JobParametersValidator groupBulkJobParameterValidator() { - return null; - //TODO GGG + public GroupIdPresentValidator groupBulkJobParameterValidator() { + return new GroupIdPresentValidator(); } @Bean diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobParameterValidator.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobParameterValidator.java index ac80443dbfc..1f937cacbed 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobParameterValidator.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/BulkExportJobParameterValidator.java @@ -57,11 +57,11 @@ public class BulkExportJobParameterValidator implements JobParametersValidator { TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager); String errorMessage = txTemplate.execute(tx -> { StringBuilder errorBuilder = new StringBuilder(); - Long readChunkSize = theJobParameters.getLong("readChunkSize"); + Long readChunkSize = theJobParameters.getLong(BulkExportJobConfig.READ_CHUNK_PARAMETER); if (readChunkSize == null || readChunkSize < 1) { errorBuilder.append("There must be a valid number for readChunkSize, which is at least 1. "); } - String jobUUID = theJobParameters.getString("jobUUID"); + String jobUUID = theJobParameters.getString(BulkExportJobConfig.JOB_UUID_PARAMETER); Optional oJob = myBulkExportJobDao.findByJobId(jobUUID); if (!StringUtils.isBlank(jobUUID) && !oJob.isPresent()) { errorBuilder.append("There is no persisted job that exists with UUID: " + jobUUID + ". "); @@ -71,9 +71,9 @@ public class BulkExportJobParameterValidator implements JobParametersValidator { boolean hasExistingJob = oJob.isPresent(); //Check for to-be-created parameters. if (!hasExistingJob) { - String resourceTypes = theJobParameters.getString("resourceTypes"); + String resourceTypes = theJobParameters.getString(BulkExportJobConfig.RESOURCE_TYPES_PARAMETER); if (StringUtils.isBlank(resourceTypes)) { - errorBuilder.append("You must include [resourceTypes] as a Job Parameter"); + errorBuilder.append("You must include [").append(BulkExportJobConfig.RESOURCE_TYPES_PARAMETER).append("] as a Job Parameter"); } else { String[] resourceArray = resourceTypes.split(","); Arrays.stream(resourceArray).filter(resourceType -> resourceType.equalsIgnoreCase("Binary")) @@ -89,9 +89,6 @@ public class BulkExportJobParameterValidator implements JobParametersValidator { } } - if (theJobParameters.getString("groupId") != null) { - ourLog.debug("detected we are running in group mode with group id [{}]", theJobParameters.getString("groupId")); - } return errorBuilder.toString(); }); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java index 774fe9c4385..ea50a65da24 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupBulkItemReader.java @@ -34,7 +34,6 @@ import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; import ca.uhn.fhir.jpa.model.util.JpaConstants; -import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.util.QueryChunker; import ca.uhn.fhir.model.api.Include; @@ -56,19 +55,22 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; public class GroupBulkItemReader implements ItemReader> { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); Iterator myPidIterator; - @Value("#{jobParameters['readChunkSize']}") - private Long READ_CHUNK_SIZE; - @Value("#{jobExecutionContext['jobUUID']}") - private String myJobUUID; + @Value("#{stepExecutionContext['resourceType']}") private String myResourceType; + @Value("#{jobParameters['" + BulkExportJobConfig.GROUP_ID_PARAMETER + "']}") + private String myGroupId; + @Value("#{jobParameters['"+ BulkExportJobConfig.JOB_UUID_PARAMETER+"']}") + private String myJobUUID; + @Value("#{jobParameters['" + BulkExportJobConfig.READ_CHUNK_PARAMETER + "']}") + private Long myReadChunkSize; + @Autowired private IBulkExportJobDao myBulkExportJobDao; @Autowired @@ -92,7 +94,7 @@ public class GroupBulkItemReader implements ItemReader outgoing = new ArrayList<>(); - while (myPidIterator.hasNext() && count < READ_CHUNK_SIZE) { + while (myPidIterator.hasNext() && count < myReadChunkSize) { outgoing.add(myPidIterator.next()); count += 1; } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupIdPresentValidator.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupIdPresentValidator.java new file mode 100644 index 00000000000..a28eaaf0338 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/job/GroupIdPresentValidator.java @@ -0,0 +1,44 @@ +package ca.uhn.fhir.jpa.bulk.job; + +/*- + * #%L + * HAPI FHIR JPA Server + * %% + * Copyright (C) 2014 - 2021 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.slf4j.Logger; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersInvalidException; +import org.springframework.batch.core.JobParametersValidator; + + +import static ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig.*; +import static org.slf4j.LoggerFactory.getLogger; + +public class GroupIdPresentValidator implements JobParametersValidator { + private static final Logger ourLog = getLogger(GroupIdPresentValidator.class); + + @Override + public void validate(JobParameters theJobParameters) throws JobParametersInvalidException { + + if (theJobParameters == null || theJobParameters.getString(GROUP_ID_PARAMETER) == null) { + throw new JobParametersInvalidException("Group Bulk Export jobs must have a " + GROUP_ID_PARAMETER + " attribute"); + } else { + ourLog.debug("detected we are running in group mode with group id [{}]", theJobParameters.getString(GROUP_ID_PARAMETER)); + } + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java index 1f4fd095a91..c4de32a3827 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/provider/BulkDataExportProvider.java @@ -189,8 +189,6 @@ public class BulkDataExportProvider { @IdParam IIdType theIdParam, @OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType theOutputFormat, @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType theType, - @OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType theSince, - @OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType theTypeFilter, @OperationParam(name = JpaConstants.PARAM_EXPORT_MDM, min = 0, max = 1, typeName = "boolean") IPrimitiveType theMdm, ServletRequestDetails theRequestDetails ) { @@ -208,15 +206,10 @@ public class BulkDataExportProvider { resourceTypes = ArrayUtil.commaSeparatedListToCleanSet(theType.getValueAsString()); } - Date since = null; - if (theSince != null) { - since = theSince.getValue(); - } - + //TODO GGG eventually, we will support these things. Set filters = null; - if (theTypeFilter != null) { - filters = ArrayUtil.commaSeparatedListToCleanSet(theTypeFilter.getValueAsString()); - } + Date since = null; + IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(outputFormat, resourceTypes, since, filters, theIdParam, theMdm.getValue())); String serverBase = getServerBase(theRequestDetails); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java index 4a82f5a741c..5054dd416ad 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/svc/BulkDataExportSvcImpl.java @@ -28,6 +28,7 @@ import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions; import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc; +import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig; import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao; import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao; @@ -43,6 +44,7 @@ import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import ca.uhn.fhir.util.UrlUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; import org.hl7.fhir.instance.model.api.IBaseBinary; import org.hl7.fhir.instance.model.api.IIdType; @@ -63,6 +65,7 @@ import org.springframework.transaction.support.TransactionTemplate; import javax.annotation.PostConstruct; import javax.transaction.Transactional; +import java.util.Arrays; import java.util.Date; import java.util.HashSet; import java.util.Map; @@ -135,9 +138,9 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { BulkExportJobEntity bulkExportJobEntity = jobToProcessOpt.get(); String jobUuid = bulkExportJobEntity.getJobId(); - boolean isForGroupExport = containsGroupId(bulkExportJobEntity.getRequest()); + String theGroupId = getGroupIdIfPresent(bulkExportJobEntity.getRequest()); try { - processJob(jobUuid, isForGroupExport); + processJob(jobUuid, theGroupId); } catch (Exception e) { ourLog.error("Failure while preparing bulk export extract", e); myTxTemplate.execute(t -> { @@ -153,14 +156,9 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { } } - private boolean containsGroupId(String theRequestString) { + private String getGroupIdIfPresent(String theRequestString) { Map stringMap = UrlUtil.parseQueryString(theRequestString); - String[] strings = stringMap.get(JpaConstants.PARAM_EXPORT_GROUP_ID); - if (strings != null && strings.length > 0) { - return true; - } else { - return false; - } + return String.join(",", stringMap.get(JpaConstants.PARAM_EXPORT_GROUP_ID)); } @@ -211,18 +209,19 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc { } - private void processJob(String theJobUuid, boolean theIsGroupRequest) { - JobParameters parameters = new JobParametersBuilder() - .addString("jobUUID", theJobUuid) - .addLong("readChunkSize", READ_CHUNK_SIZE).toJobParameters(); + private void processJob(String theJobUuid, String theGroupId) { + JobParametersBuilder parameters = new JobParametersBuilder() + .addString(BulkExportJobConfig.JOB_UUID_PARAMETER, theJobUuid) + .addLong(BulkExportJobConfig.READ_CHUNK_PARAMETER, READ_CHUNK_SIZE); ourLog.info("Submitting bulk export job {} to job scheduler", theJobUuid); try { - if (theIsGroupRequest) { - myJobSubmitter.runJob(myGroupBulkExportJob, parameters); + if (!StringUtils.isBlank(theGroupId)) { + parameters.addString(BulkExportJobConfig.GROUP_ID_PARAMETER, theGroupId); + myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters()); } else { - myJobSubmitter.runJob(myBulkExportJob, parameters); + myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters()); } } catch (JobParametersInvalidException theE) { ourLog.error("Unable to start job with UUID: {}, the parameters are invalid. {}", theJobUuid, theE.getMessage());