Minor refactor

This commit is contained in:
Tadgh 2021-02-24 15:14:59 -05:00
parent 30905aeba7
commit 0033827f57
6 changed files with 84 additions and 47 deletions

View File

@ -46,6 +46,10 @@ import java.util.List;
*/
@Configuration
public class BulkExportJobConfig {
public static final String JOB_UUID_PARAMETER = "jobUUID";
public static final String READ_CHUNK_PARAMETER = "readChunkSize";
public static final String GROUP_ID_PARAMETER = "groupId";
public static final String RESOURCE_TYPES_PARAMETER = "resourceTypes";
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@ -77,6 +81,7 @@ public class BulkExportJobConfig {
public Job groupBulkExportJob() {
return myJobBuilderFactory.get("groupBulkExportJob")
.validator(groupBulkJobParameterValidator())
.validator(bulkJobParameterValidator())
.start(createBulkExportEntityStep())
.next(groupPartitionStep())
.next(closeJobStep())
@ -84,9 +89,8 @@ public class BulkExportJobConfig {
}
@Bean
public JobParametersValidator groupBulkJobParameterValidator() {
return null;
//TODO GGG
public GroupIdPresentValidator groupBulkJobParameterValidator() {
return new GroupIdPresentValidator();
}
@Bean

View File

@ -57,11 +57,11 @@ public class BulkExportJobParameterValidator implements JobParametersValidator {
TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
String errorMessage = txTemplate.execute(tx -> {
StringBuilder errorBuilder = new StringBuilder();
Long readChunkSize = theJobParameters.getLong("readChunkSize");
Long readChunkSize = theJobParameters.getLong(BulkExportJobConfig.READ_CHUNK_PARAMETER);
if (readChunkSize == null || readChunkSize < 1) {
errorBuilder.append("There must be a valid number for readChunkSize, which is at least 1. ");
}
String jobUUID = theJobParameters.getString("jobUUID");
String jobUUID = theJobParameters.getString(BulkExportJobConfig.JOB_UUID_PARAMETER);
Optional<BulkExportJobEntity> oJob = myBulkExportJobDao.findByJobId(jobUUID);
if (!StringUtils.isBlank(jobUUID) && !oJob.isPresent()) {
errorBuilder.append("There is no persisted job that exists with UUID: " + jobUUID + ". ");
@ -71,9 +71,9 @@ public class BulkExportJobParameterValidator implements JobParametersValidator {
boolean hasExistingJob = oJob.isPresent();
//Check for to-be-created parameters.
if (!hasExistingJob) {
String resourceTypes = theJobParameters.getString("resourceTypes");
String resourceTypes = theJobParameters.getString(BulkExportJobConfig.RESOURCE_TYPES_PARAMETER);
if (StringUtils.isBlank(resourceTypes)) {
errorBuilder.append("You must include [resourceTypes] as a Job Parameter");
errorBuilder.append("You must include [").append(BulkExportJobConfig.RESOURCE_TYPES_PARAMETER).append("] as a Job Parameter");
} else {
String[] resourceArray = resourceTypes.split(",");
Arrays.stream(resourceArray).filter(resourceType -> resourceType.equalsIgnoreCase("Binary"))
@ -89,9 +89,6 @@ public class BulkExportJobParameterValidator implements JobParametersValidator {
}
}
if (theJobParameters.getString("groupId") != null) {
ourLog.debug("detected we are running in group mode with group id [{}]", theJobParameters.getString("groupId"));
}
return errorBuilder.toString();
});

View File

@ -34,7 +34,6 @@ import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.QueryChunker;
import ca.uhn.fhir.model.api.Include;
@ -56,19 +55,22 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
Iterator<ResourcePersistentId> myPidIterator;
@Value("#{jobParameters['readChunkSize']}")
private Long READ_CHUNK_SIZE;
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
@Value("#{jobParameters['" + BulkExportJobConfig.GROUP_ID_PARAMETER + "']}")
private String myGroupId;
@Value("#{jobParameters['"+ BulkExportJobConfig.JOB_UUID_PARAMETER+"']}")
private String myJobUUID;
@Value("#{jobParameters['" + BulkExportJobConfig.READ_CHUNK_PARAMETER + "']}")
private Long myReadChunkSize;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Autowired
@ -92,7 +94,7 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
//Fetch all the pids given the query.
ISearchBuilder searchBuilder = getSearchBuilder();
//Build comlex-ish _has query with a revincludes which allows lookup by group membership
//Build complex-ish _has query with a revincludes which allows lookup by group membership
SearchParameterMap searchParameterMap = getSearchParameterMap(jobEntity);
IResultIterator resultIterator = searchBuilder.createQuery(
@ -129,9 +131,7 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
@Nonnull
private SearchParameterMap getSearchParameterMap(BulkExportJobEntity jobEntity) {
SearchParameterMap searchParameterMap = new SearchParameterMap();
String groupIdFromRequest = getGroupIdFromRequest(jobEntity);
searchParameterMap.add("_has", new HasParam("Group", "member", "_id", groupIdFromRequest));
searchParameterMap.add("_has", new HasParam("Group", "member", "_id", myGroupId));
String revIncludeString = buildRevIncludeString();
searchParameterMap.addRevInclude(new Include(revIncludeString).toLocked());
@ -182,7 +182,7 @@ public class GroupBulkItemReader implements ItemReader<List<ResourcePersistentId
}
int count = 0;
List<ResourcePersistentId> outgoing = new ArrayList<>();
while (myPidIterator.hasNext() && count < READ_CHUNK_SIZE) {
while (myPidIterator.hasNext() && count < myReadChunkSize) {
outgoing.add(myPidIterator.next());
count += 1;
}

View File

@ -0,0 +1,44 @@
package ca.uhn.fhir.jpa.bulk.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.slf4j.Logger;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.JobParametersValidator;
import static ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig.*;
import static org.slf4j.LoggerFactory.getLogger;
public class GroupIdPresentValidator implements JobParametersValidator {
private static final Logger ourLog = getLogger(GroupIdPresentValidator.class);
@Override
public void validate(JobParameters theJobParameters) throws JobParametersInvalidException {
if (theJobParameters == null || theJobParameters.getString(GROUP_ID_PARAMETER) == null) {
throw new JobParametersInvalidException("Group Bulk Export jobs must have a " + GROUP_ID_PARAMETER + " attribute");
} else {
ourLog.debug("detected we are running in group mode with group id [{}]", theJobParameters.getString(GROUP_ID_PARAMETER));
}
}
}

View File

@ -189,8 +189,6 @@ public class BulkDataExportProvider {
@IdParam IIdType theIdParam,
@OperationParam(name = JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theOutputFormat,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theType,
@OperationParam(name = JpaConstants.PARAM_EXPORT_SINCE, min = 0, max = 1, typeName = "instant") IPrimitiveType<Date> theSince,
@OperationParam(name = JpaConstants.PARAM_EXPORT_TYPE_FILTER, min = 0, max = 1, typeName = "string") IPrimitiveType<String> theTypeFilter,
@OperationParam(name = JpaConstants.PARAM_EXPORT_MDM, min = 0, max = 1, typeName = "boolean") IPrimitiveType<Boolean> theMdm,
ServletRequestDetails theRequestDetails
) {
@ -208,15 +206,10 @@ public class BulkDataExportProvider {
resourceTypes = ArrayUtil.commaSeparatedListToCleanSet(theType.getValueAsString());
}
Date since = null;
if (theSince != null) {
since = theSince.getValue();
}
//TODO GGG eventually, we will support these things.
Set<String> filters = null;
if (theTypeFilter != null) {
filters = ArrayUtil.commaSeparatedListToCleanSet(theTypeFilter.getValueAsString());
}
Date since = null;
IBulkDataExportSvc.JobInfo outcome = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(outputFormat, resourceTypes, since, filters, theIdParam, theMdm.getValue()));
String serverBase = getServerBase(theRequestDetails);

View File

@ -28,6 +28,7 @@ import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
@ -43,6 +44,7 @@ import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.UrlUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IIdType;
@ -63,6 +65,7 @@ import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
import javax.transaction.Transactional;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
@ -135,9 +138,9 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
BulkExportJobEntity bulkExportJobEntity = jobToProcessOpt.get();
String jobUuid = bulkExportJobEntity.getJobId();
boolean isForGroupExport = containsGroupId(bulkExportJobEntity.getRequest());
String theGroupId = getGroupIdIfPresent(bulkExportJobEntity.getRequest());
try {
processJob(jobUuid, isForGroupExport);
processJob(jobUuid, theGroupId);
} catch (Exception e) {
ourLog.error("Failure while preparing bulk export extract", e);
myTxTemplate.execute(t -> {
@ -153,14 +156,9 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
}
private boolean containsGroupId(String theRequestString) {
private String getGroupIdIfPresent(String theRequestString) {
Map<String, String[]> stringMap = UrlUtil.parseQueryString(theRequestString);
String[] strings = stringMap.get(JpaConstants.PARAM_EXPORT_GROUP_ID);
if (strings != null && strings.length > 0) {
return true;
} else {
return false;
}
return String.join(",", stringMap.get(JpaConstants.PARAM_EXPORT_GROUP_ID));
}
@ -211,18 +209,19 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
}
private void processJob(String theJobUuid, boolean theIsGroupRequest) {
JobParameters parameters = new JobParametersBuilder()
.addString("jobUUID", theJobUuid)
.addLong("readChunkSize", READ_CHUNK_SIZE).toJobParameters();
private void processJob(String theJobUuid, String theGroupId) {
JobParametersBuilder parameters = new JobParametersBuilder()
.addString(BulkExportJobConfig.JOB_UUID_PARAMETER, theJobUuid)
.addLong(BulkExportJobConfig.READ_CHUNK_PARAMETER, READ_CHUNK_SIZE);
ourLog.info("Submitting bulk export job {} to job scheduler", theJobUuid);
try {
if (theIsGroupRequest) {
myJobSubmitter.runJob(myGroupBulkExportJob, parameters);
if (!StringUtils.isBlank(theGroupId)) {
parameters.addString(BulkExportJobConfig.GROUP_ID_PARAMETER, theGroupId);
myJobSubmitter.runJob(myGroupBulkExportJob, parameters.toJobParameters());
} else {
myJobSubmitter.runJob(myBulkExportJob, parameters);
myJobSubmitter.runJob(myBulkExportJob, parameters.toJobParameters());
}
} catch (JobParametersInvalidException theE) {
ourLog.error("Unable to start job with UUID: {}, the parameters are invalid. {}", theJobUuid, theE.getMessage());