More code review comments
This commit is contained in:
parent
942fc313ec
commit
c36e1d2899
|
@ -58,7 +58,7 @@ public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
|
||||||
private void loadResourcePids() {
|
private void loadResourcePids() {
|
||||||
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
|
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
|
||||||
if (!jobOpt.isPresent()) {
|
if (!jobOpt.isPresent()) {
|
||||||
ourLog.info("Job appears to be deleted");
|
ourLog.warn("Job appears to be deleted");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
myJobEntity = jobOpt.get();
|
myJobEntity = jobOpt.get();
|
||||||
|
|
|
@ -27,7 +27,7 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
|
||||||
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
|
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private FhirContext myContext;
|
private FhirContext myFhirContext;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private DaoRegistry myDaoRegistry;
|
private DaoRegistry myDaoRegistry;
|
||||||
|
@ -52,7 +52,7 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void start() {
|
public void start() {
|
||||||
myParser = myContext.newJsonParser().setPrettyPrint(false);
|
myParser = myFhirContext.newJsonParser().setPrettyPrint(false);
|
||||||
myBinaryDao = getBinaryDao();
|
myBinaryDao = getBinaryDao();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,7 +73,7 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private IIdType createBinaryFromOutputStream() {
|
private IIdType createBinaryFromOutputStream() {
|
||||||
IBaseBinary binary = BinaryUtil.newBinary(myContext);
|
IBaseBinary binary = BinaryUtil.newBinary(myFhirContext);
|
||||||
binary.setContentType(Constants.CT_FHIR_NDJSON);
|
binary.setContentType(Constants.CT_FHIR_NDJSON);
|
||||||
binary.setContent(myOutputStream.toByteArray());
|
binary.setContent(myOutputStream.toByteArray());
|
||||||
|
|
||||||
|
@ -97,6 +97,8 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
|
||||||
}
|
}
|
||||||
|
|
||||||
Optional<IIdType> createdId = flushToFiles();
|
Optional<IIdType> createdId = flushToFiles();
|
||||||
createdId.ifPresent(theIIdType -> ourLog.warn("Created resources for bulk export file containing {} resources of type ", theIIdType.toUnqualifiedVersionless().getValue()));
|
if (createdId.isPresent()) {
|
||||||
|
ourLog.info("Created resources for bulk export file containing {} resources of type ", createdId.get().toUnqualifiedVersionless().getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,11 +28,7 @@ public class ResourceTypePartitioner implements Partitioner {
|
||||||
Map<String, ExecutionContext> partitionContextMap = new HashMap<>();
|
Map<String, ExecutionContext> partitionContextMap = new HashMap<>();
|
||||||
|
|
||||||
Map<Long, String> idToResourceType = myBulkExportDaoSvc.getBulkJobCollectionIdToResourceTypeMap( myJobUUID);
|
Map<Long, String> idToResourceType = myBulkExportDaoSvc.getBulkJobCollectionIdToResourceTypeMap( myJobUUID);
|
||||||
//observation -> obs1.json, obs2.json, obs3.json BulkJobCollectionEntity
|
|
||||||
//bulk Collection Entity ID -> patient
|
|
||||||
|
|
||||||
// 123123-> Patient
|
|
||||||
// 91876389126-> Observation
|
|
||||||
idToResourceType.entrySet().stream()
|
idToResourceType.entrySet().stream()
|
||||||
.forEach(entry -> {
|
.forEach(entry -> {
|
||||||
String resourceType = entry.getValue();
|
String resourceType = entry.getValue();
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class BulkExportDaoSvc {
|
||||||
private BulkExportJobEntity loadJob(String theJobUUID) {
|
private BulkExportJobEntity loadJob(String theJobUUID) {
|
||||||
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(theJobUUID);
|
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(theJobUUID);
|
||||||
if (!jobOpt.isPresent()) {
|
if (!jobOpt.isPresent()) {
|
||||||
ourLog.info("Job appears to be deleted");
|
ourLog.warn("Job with UUID {} appears to be deleted", theJobUUID);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return jobOpt.get();
|
return jobOpt.get();
|
||||||
|
|
|
@ -35,7 +35,4 @@ public interface IBulkExportCollectionDao extends JpaRepository<BulkExportCollec
|
||||||
@Modifying
|
@Modifying
|
||||||
@Query("DELETE FROM BulkExportCollectionEntity t WHERE t.myId = :pid")
|
@Query("DELETE FROM BulkExportCollectionEntity t WHERE t.myId = :pid")
|
||||||
void deleteByPid(@Param("pid") Long theId);
|
void deleteByPid(@Param("pid") Long theId);
|
||||||
|
|
||||||
// @Query("SELECT BulkExportCollectionEntity ")
|
|
||||||
// void findByJobId(Long theId);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
import static org.hamcrest.CoreMatchers.containsString;
|
import static org.hamcrest.CoreMatchers.containsString;
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
@ -264,7 +265,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
||||||
.addLong("readChunkSize", 10L);
|
.addLong("readChunkSize", 10L);
|
||||||
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
|
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
|
||||||
|
|
||||||
IBulkDataExportSvc.JobInfo jobInfo = awaitJobCompletion(jobDetails.getJobId());
|
awaitJobCompletion(jobDetails.getJobId());
|
||||||
|
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
|
||||||
|
|
||||||
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
|
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
|
||||||
assertThat(jobInfo.getFiles().size(), equalTo(2));
|
assertThat(jobInfo.getFiles().size(), equalTo(2));
|
||||||
}
|
}
|
||||||
|
@ -281,14 +284,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
||||||
}
|
}
|
||||||
|
|
||||||
public IBulkDataExportSvc.JobInfo awaitJobCompletion(String theJobId) throws InterruptedException {
|
public IBulkDataExportSvc.JobInfo awaitJobCompletion(String theJobId) throws InterruptedException {
|
||||||
while(true) {
|
await().until(() -> myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(theJobId).getStatus() == BulkJobStatusEnum.COMPLETE);
|
||||||
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(theJobId);
|
|
||||||
if (jobInfo.getStatus() != BulkJobStatusEnum.COMPLETE) {
|
|
||||||
Thread.sleep(1000L);
|
|
||||||
} else {
|
|
||||||
return jobInfo;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -8,14 +8,12 @@ import ca.uhn.fhir.jpa.subscription.channel.config.SubscriptionChannelConfig;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
|
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
|
||||||
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
|
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
|
||||||
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
|
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
|
||||||
import ca.uhn.fhir.model.dstu2.resource.Provenance;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.context.annotation.Import;
|
import org.springframework.context.annotation.Import;
|
||||||
import org.springframework.context.annotation.Lazy;
|
import org.springframework.context.annotation.Lazy;
|
||||||
import org.springframework.context.annotation.Primary;
|
import org.springframework.context.annotation.Primary;
|
||||||
import org.springframework.orm.jpa.JpaTransactionManager;
|
import org.springframework.orm.jpa.JpaTransactionManager;
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
|
||||||
|
|
||||||
import javax.persistence.EntityManagerFactory;
|
import javax.persistence.EntityManagerFactory;
|
||||||
|
|
||||||
|
@ -43,7 +41,8 @@ public class TestJPAConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
I had to rename this bean as it was clashing with Spring Batch `transactionManager` in SimpleBatchConfiguration
|
Please do not rename this bean to "transactionManager()" as this will conflict with the transactionManager
|
||||||
|
provided by Spring Batch.
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@Primary
|
@Primary
|
||||||
|
|
Loading…
Reference in New Issue