Increase min thread count for batch. Add new which waits for all bulkexport jobs to be done

This commit is contained in:
Tadgh 2020-06-24 13:16:11 -07:00
parent 99fe66bb58
commit 64dbd038c9
2 changed files with 33 additions and 4 deletions

View File

@ -28,14 +28,19 @@ import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors;
import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
@ -59,6 +64,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
private IBulkDataExportSvc myBulkDataExportSvc; private IBulkDataExportSvc myBulkDataExportSvc;
@Autowired @Autowired
private IBatchJobSubmitter myBatchJobSubmitter; private IBatchJobSubmitter myBatchJobSubmitter;
@Autowired
private JobExplorer myJobExplorer;
@Autowired @Autowired
@Qualifier("bulkExportJob") @Qualifier("bulkExportJob")
@ -165,6 +172,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Run a scheduled pass to build the export // Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles(); myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
// Fetch the job again // Fetch the job again
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus()); assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus());
@ -235,6 +244,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Run a scheduled pass to build the export // Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles(); myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
// Fetch the job again // Fetch the job again
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus()); assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus());
@ -260,6 +271,25 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
} }
} }
public void awaitAllBulkJobCompletions() {
List<JobInstance> bulkExport = myJobExplorer.findJobInstancesByJobName("bulkExportJob", 0, 100);
if (bulkExport.isEmpty()) {
fail("There are no bulk export jobs running!");
}
List<JobExecution> bulkExportExecutions = bulkExport.stream().flatMap(jobInstance -> myJobExplorer.getJobExecutions(jobInstance).stream()).collect(Collectors.toList());
awaitJobCompletions(bulkExportExecutions);
}
public void awaitJobCompletions(Collection<JobExecution> theJobs) {
theJobs.stream().forEach(jobExecution -> {
try {
awaitJobCompletion(jobExecution);
} catch (InterruptedException theE) {
fail();
}
});
}
@Test @Test
public void testSubmitReusesExisting() { public void testSubmitReusesExisting() {
@ -345,6 +375,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
// Run a scheduled pass to build the export // Run a scheduled pass to build the export
myBulkDataExportSvc.buildExportFiles(); myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
// Fetch the job again // Fetch the job again
status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId()); status = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());

View File

@ -25,7 +25,6 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.sql.Connection; import java.sql.Connection;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -35,8 +34,6 @@ import static org.junit.Assert.fail;
@EnableTransactionManagement() @EnableTransactionManagement()
public class TestR4Config extends BaseJavaConfigR4 { public class TestR4Config extends BaseJavaConfigR4 {
CountDownLatch jobLatch = new CountDownLatch(1);
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestR4Config.class); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestR4Config.class);
public static Integer ourMaxThreads; public static Integer ourMaxThreads;
@ -47,7 +44,7 @@ public class TestR4Config extends BaseJavaConfigR4 {
* starvation * starvation
*/ */
if (ourMaxThreads == null) { if (ourMaxThreads == null) {
ourMaxThreads = (int) (Math.random() * 6.0) + 1; ourMaxThreads = (int) (Math.random() * 6.0) + 2;
if ("true".equals(System.getProperty("single_db_connection"))) { if ("true".equals(System.getProperty("single_db_connection"))) {
ourMaxThreads = 1; ourMaxThreads = 1;