Code review comments

This commit is contained in:
Tadgh 2020-06-09 19:00:41 -07:00
parent b1d2ab7619
commit 942fc313ec
15 changed files with 106 additions and 54 deletions

View File

@ -0,0 +1,31 @@
package ca.uhn.fhir.jpa.batch.log;
/*-
* #%L
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Logs {
private static final Logger ourBatchTroubleshootingLog = LoggerFactory.getLogger("ca.uhn.fhir.log.batch_troubleshooting");
public static Logger getBatchTroubleshootingLog() {
return ourBatchTroubleshootingLog;
}
}

View File

@ -3,10 +3,12 @@ package ca.uhn.fhir.jpa.batch.processors;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -14,11 +16,13 @@ import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* Reusable Item Processor which converts ResourcePersistentIds to their IBaseResources
*/
public class PidToIBaseResourceProcessor implements ItemProcessor<List<ResourcePersistentId>, List<IBaseResource>> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@ -41,6 +45,7 @@ public class PidToIBaseResourceProcessor implements ItemProcessor<List<ResourceP
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, resourceTypeClass);
List<IBaseResource> outgoing = new ArrayList<>();
sb.loadResourcesByPid(theResourcePersistentId, Collections.emptyList(), outgoing, false, null);
ourLog.trace("Loaded resources: {}", outgoing.stream().map(Object::toString).collect(Collectors.joining(", ")));
return outgoing;
}

View File

@ -72,7 +72,7 @@ public class BulkExportJobConfig {
@Bean
public Step partitionStep() {
return myStepBuilderFactory.get("partitionStep")
.partitioner("bulkExportGenerateResourceFilesStep", partitioner(null))
.partitioner("bulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner(null))
.step(bulkExportGenerateResourceFilesStep())
.taskExecutor(myTaskExecutor)
.build();
@ -88,7 +88,7 @@ public class BulkExportJobConfig {
@Bean
@JobScope
public ResourceTypePartitioner partitioner(@Value("#{jobParameters['jobUUID']}") String theJobUUID) {
public ResourceTypePartitioner bulkExportResourceTypePartitioner(@Value("#{jobParameters['jobUUID']}") String theJobUUID) {
return new ResourceTypePartitioner(theJobUUID);
}

View File

@ -32,8 +32,7 @@ public class BulkExportJobStatusChangeListener implements JobExecutionListener {
if (theJobExecution.getStatus() == BatchStatus.COMPLETED) {
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.COMPLETE);
} else {
//If the job didn't complete successfully, just set it back to submitted so that it gets picked up again by the scheduler.
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.SUBMITTED);
myBulkExportDaoSvc.setJobToStatus(myJobUUID, BulkJobStatusEnum.ERROR);
}
}
}

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
@ -15,7 +16,6 @@ import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
@ -29,9 +29,10 @@ import java.util.List;
import java.util.Optional;
public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private static final Logger ourLog = LoggerFactory.getLogger(BulkItemReader.class);
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private static final int READ_CHUNK_SIZE = 10;
@Value("#{jobParameters['readChunkSize']}")
private Long READ_CHUNK_SIZE;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;

View File

@ -19,14 +19,26 @@ public class JobExistsParameterValidator implements JobParametersValidator {
@Override
public void validate(JobParameters theJobParameters) throws JobParametersInvalidException {
if (theJobParameters == null) {
throw new JobParametersInvalidException("This job requires Parameters: [readChunkSize] and [jobUUID]");
}
Long readChunkSize = theJobParameters.getLong("readChunkSize");
String errorMessage = "";
if (readChunkSize == null || readChunkSize < 1) {
errorMessage += "There must be a valid number for readChunkSize, which is at least 1. ";
}
String jobUUID = theJobParameters.getString("jobUUID");
if (StringUtils.isBlank(jobUUID)) {
throw new JobParametersInvalidException("You did not pass a jobUUID to this job!");
errorMessage += "You did not pass a jobUUID to this job! ";
}
Optional<BulkExportJobEntity> oJob = myBulkExportJobDao.findByJobId(jobUUID);
if (!oJob.isPresent()) {
throw new JobParametersInvalidException("There is no persisted job that exists with UUID: " + jobUUID);
errorMessage += "There is no persisted job that exists with UUID: " + jobUUID + ". ";
}
if (!StringUtils.isEmpty(errorMessage)) {
throw new JobParametersInvalidException(errorMessage);
}
}
}

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.bulk.job;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.jpa.bulk.svc.BulkExportDaoSvc;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.parser.IParser;
@ -22,10 +23,8 @@ import java.io.OutputStreamWriter;
import java.util.List;
import java.util.Optional;
import static org.slf4j.LoggerFactory.getLogger;
public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
private static final Logger ourLog = getLogger(ResourceToFileWriter.class);
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Autowired
private FhirContext myContext;

View File

@ -71,6 +71,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
private static final Long READ_CHUNK_SIZE = 10L;
private static final Logger ourLog = LoggerFactory.getLogger(BulkDataExportSvcImpl.class);
private int myReuseBulkExportForMillis = (int) (60 * DateUtils.MILLIS_PER_MINUTE);
@ -191,6 +192,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
private void processJob(String theJobUuid) {
JobParameters parameters = new JobParametersBuilder()
.addString("jobUUID", theJobUuid)
.addLong("readChunkSize", READ_CHUNK_SIZE)
.toJobParameters();
try {

View File

@ -37,7 +37,7 @@ public class BulkExportDaoSvc {
Optional<BulkExportCollectionEntity> byId = myBulkExportCollectionDao.findById(theCollectionEntityId);
if (byId.isPresent()) {
BulkExportCollectionEntity exportCollectionEntity = byId.get();
theFile.setCollection(exportCollectionEntity);;
theFile.setCollection(exportCollectionEntity);
myBulkExportCollectionFileDao.saveAndFlush(theFile);
myBulkExportCollectionDao.saveAndFlush(exportCollectionEntity);
}

View File

@ -259,11 +259,14 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, Sets.newHashSet("Patient", "Observation"), null, null);
//Add the UUID to the job
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", jobDetails.getJobId());
JobParametersBuilder paramBuilder = new JobParametersBuilder()
.addString("jobUUID", jobDetails.getJobId())
.addLong("readChunkSize", 10L);
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
IBulkDataExportSvc.JobInfo jobInfo = awaitJobCompletion(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
}
@Test
@ -273,7 +276,6 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
fail("Should have had invalid parameter execption!");
} catch (JobParametersInvalidException e) {
}
}

View File

@ -7,5 +7,13 @@ import org.springframework.batch.core.JobParametersInvalidException;
public interface IBatchJobSubmitter {
/**
* Given a {@link Job} and a {@link JobParameters}, execute the job with the given parameters.
*
* @param theJob the job to run.
* @param theJobParameters A collection of key-value pairs that are used to parameterize the job.
* @return A {@link JobExecution} representing the job.
* @throws JobParametersInvalidException If validation on the parameters fails.
*/
JobExecution runJob(Job theJob, JobParameters theJobParameters) throws JobParametersInvalidException;
}

View File

@ -2,8 +2,6 @@ package ca.uhn.fhir.jpa.batch.config;
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean;
import org.springframework.batch.core.launch.JobLauncher;
@ -16,8 +14,8 @@ import org.springframework.transaction.PlatformTransactionManager;
import javax.annotation.PostConstruct;
@Configuration
@EnableBatchProcessing
@Configuration
public class InMemoryJobRepositoryBatchConfig implements BatchConfigurer {
private PlatformTransactionManager myPlatformTransactionManager;

View File

@ -1,8 +1,5 @@
package ca.uhn.fhir.jpa.batch.config;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.test.concurrency.IPointcutLatch;
import ca.uhn.test.concurrency.PointcutLatch;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
@ -11,15 +8,12 @@ import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Configuration
public class BatchJobConfig implements IPointcutLatch {
public class BatchJobConfig {
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@ -27,11 +21,9 @@ public class BatchJobConfig implements IPointcutLatch {
@Autowired
private StepBuilderFactory myStepBuilderFactory;
private final PointcutLatch myPointcutLatch = new PointcutLatch("batchJobLatch");
@Bean
public Job datJob() {
public Job testJob() {
return myJobBuilderFactory.get("testJob")
.start(testStep())
.build();
@ -39,11 +31,8 @@ public class BatchJobConfig implements IPointcutLatch {
@Bean
public Step testStep() {
//return myStepBuilderFactory.get("testStep").tasklet(sampleTasklet()).build();
return myStepBuilderFactory.get("testStep")
.<String, String>chunk(100)
.reader(reader())
.writer(simpleWriter())
.tasklet(sampleTasklet())
.build();
}
@ -61,25 +50,6 @@ public class BatchJobConfig implements IPointcutLatch {
@Bean
public ItemWriter<String> simpleWriter() {
return new ItemWriter<String>() {
@Override
public void write(List<? extends String> theList) throws Exception {
theList.forEach(System.out::println);
}
};
}
@Override
public void clear() {
myPointcutLatch.clear();
}
@Override
public void setExpectedCount(int count) {
myPointcutLatch.setExpectedCount(count);
}
@Override
public List<HookParams> awaitExpected() throws InterruptedException {
return myPointcutLatch.awaitExpected();
return theList -> theList.forEach(System.out::println);
}
}

View File

@ -16,9 +16,7 @@ public class BatchSvcTest extends BaseBatchR4Test {
@Test
public void testApplicationContextLoads() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, InterruptedException {
myBatchJobConfig.setExpectedCount(1);
myJobLauncher.run(myJob, new JobParameters());
myBatchJobConfig.awaitExpected();
}
}

View File

@ -47,6 +47,11 @@
<logger name="org.hibernate" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>
<!--
Configuration for EMPI troubleshooting log
-->
<appender name="EMPI_TROUBLESHOOTING" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter"><level>DEBUG</level></filter>
<file>${smile.basedir}/log/empi-troubleshooting.log</file>
@ -67,6 +72,28 @@
</logger>
<!--
Configuration for Spring Batch troubleshooting log
-->
<appender name="BATCH_TROUBLESHOOTING" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter"><level>DEBUG</level></filter>
<file>${smile.basedir}/log/batch-troubleshooting.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${smile.basedir}/log/batch-troubleshooting.log.%i.gz</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>9</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>5MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n${log.stackfilter.pattern}</pattern>
</encoder>
</appender>
<logger name="ca.uhn.fhir.log.batch_troubleshooting" level="TRACE">
<appender-ref ref="BATCH_TROUBLESHOOTING"/>
</logger>
<root level="info">
<appender-ref ref="STDOUT" />
</root>