Try to avoid intermittent
This commit is contained in:
parent
56ef55a353
commit
b2a4ff2f29
|
@ -26,7 +26,10 @@ import ca.uhn.fhir.util.BundleBuilder;
|
|||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.r4.model.Patient;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.MethodOrderer;
|
||||
import org.junit.jupiter.api.Order;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestMethodOrder;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -58,6 +61,7 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuilder {
|
||||
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportR4Test.class);
|
||||
|
@ -80,75 +84,7 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
|
|||
myInterceptorRegistry.unregisterInterceptorsIf(t -> t instanceof MyFailAfterThreeCreatesInterceptor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlow_TransactionRows() {
|
||||
int transactionsPerFile = 10;
|
||||
int fileCount = 10;
|
||||
List<BulkImportJobFileJson> files = createInputFiles(transactionsPerFile, fileCount);
|
||||
|
||||
BulkImportJobJson job = new BulkImportJobJson();
|
||||
job.setProcessingMode(JobFileRowProcessingModeEnum.FHIR_TRANSACTION);
|
||||
job.setJobDescription("testFlow_TransactionRows");
|
||||
job.setBatchSize(3);
|
||||
String jobId = mySvc.createNewJob(job, files);
|
||||
mySvc.markJobAsReadyForActivation(jobId);
|
||||
|
||||
boolean activateJobOutcome = mySvc.activateNextReadyJob();
|
||||
assertTrue(activateJobOutcome);
|
||||
|
||||
List<JobExecution> executions = awaitAllBulkImportJobCompletion();
|
||||
assertEquals("testFlow_TransactionRows", executions.get(0).getJobParameters().getString(BulkExportJobConfig.JOB_DESCRIPTION));
|
||||
|
||||
runInTransaction(() -> {
|
||||
List<BulkImportJobEntity> jobs = myBulkImportJobDao.findAll();
|
||||
assertEquals(0, jobs.size());
|
||||
|
||||
List<BulkImportJobFileEntity> jobFiles = myBulkImportJobFileDao.findAll();
|
||||
assertEquals(0, jobFiles.size());
|
||||
|
||||
});
|
||||
|
||||
IBundleProvider searchResults = myPatientDao.search(SearchParameterMap.newSynchronous());
|
||||
assertEquals(transactionsPerFile * fileCount, searchResults.sizeOrThrowNpe());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlow_WithTenantNamesInInput() {
|
||||
int transactionsPerFile = 5;
|
||||
int fileCount = 10;
|
||||
List<BulkImportJobFileJson> files = createInputFiles(transactionsPerFile, fileCount);
|
||||
for (int i = 0; i < fileCount; i++) {
|
||||
files.get(i).setTenantName("TENANT" + i);
|
||||
}
|
||||
|
||||
IAnonymousInterceptor interceptor = mock(IAnonymousInterceptor.class);
|
||||
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED, interceptor);
|
||||
|
||||
BulkImportJobJson job = new BulkImportJobJson();
|
||||
job.setProcessingMode(JobFileRowProcessingModeEnum.FHIR_TRANSACTION);
|
||||
job.setBatchSize(5);
|
||||
String jobId = mySvc.createNewJob(job, files);
|
||||
mySvc.markJobAsReadyForActivation(jobId);
|
||||
|
||||
boolean activateJobOutcome = mySvc.activateNextReadyJob();
|
||||
assertTrue(activateJobOutcome);
|
||||
|
||||
awaitAllBulkImportJobCompletion();
|
||||
|
||||
ArgumentCaptor<HookParams> paramsCaptor = ArgumentCaptor.forClass(HookParams.class);
|
||||
verify(interceptor, times(50)).invoke(any(), paramsCaptor.capture());
|
||||
List<String> tenantNames = paramsCaptor
|
||||
.getAllValues()
|
||||
.stream()
|
||||
.map(t -> t.get(RequestDetails.class).getTenantId())
|
||||
.distinct()
|
||||
.sorted()
|
||||
.collect(Collectors.toList());
|
||||
assertThat(tenantNames, containsInAnyOrder(
|
||||
"TENANT0", "TENANT1", "TENANT2", "TENANT3", "TENANT4", "TENANT5", "TENANT6", "TENANT7", "TENANT8", "TENANT9"
|
||||
));
|
||||
}
|
||||
|
||||
@Order(-1)
|
||||
@Test
|
||||
public void testFlow_ErrorDuringWrite() {
|
||||
myInterceptorRegistry.registerInterceptor(new MyFailAfterThreeCreatesInterceptor());
|
||||
|
@ -190,6 +126,77 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
|
|||
|
||||
}
|
||||
|
||||
@Order(0)
|
||||
@Test
|
||||
public void testFlow_TransactionRows() {
|
||||
int transactionsPerFile = 10;
|
||||
int fileCount = 10;
|
||||
List<BulkImportJobFileJson> files = createInputFiles(transactionsPerFile, fileCount);
|
||||
|
||||
BulkImportJobJson job = new BulkImportJobJson();
|
||||
job.setProcessingMode(JobFileRowProcessingModeEnum.FHIR_TRANSACTION);
|
||||
job.setJobDescription("testFlow_TransactionRows");
|
||||
job.setBatchSize(3);
|
||||
String jobId = mySvc.createNewJob(job, files);
|
||||
mySvc.markJobAsReadyForActivation(jobId);
|
||||
|
||||
boolean activateJobOutcome = mySvc.activateNextReadyJob();
|
||||
assertTrue(activateJobOutcome);
|
||||
|
||||
List<JobExecution> executions = awaitAllBulkImportJobCompletion();
|
||||
assertEquals("testFlow_TransactionRows", executions.get(0).getJobParameters().getString(BulkExportJobConfig.JOB_DESCRIPTION));
|
||||
|
||||
runInTransaction(() -> {
|
||||
List<BulkImportJobEntity> jobs = myBulkImportJobDao.findAll();
|
||||
assertEquals(0, jobs.size());
|
||||
|
||||
List<BulkImportJobFileEntity> jobFiles = myBulkImportJobFileDao.findAll();
|
||||
assertEquals(0, jobFiles.size());
|
||||
|
||||
});
|
||||
|
||||
IBundleProvider searchResults = myPatientDao.search(SearchParameterMap.newSynchronous());
|
||||
assertEquals(transactionsPerFile * fileCount, searchResults.sizeOrThrowNpe());
|
||||
}
|
||||
|
||||
@Order(1)
|
||||
@Test
|
||||
public void testFlow_WithTenantNamesInInput() {
|
||||
int transactionsPerFile = 5;
|
||||
int fileCount = 10;
|
||||
List<BulkImportJobFileJson> files = createInputFiles(transactionsPerFile, fileCount);
|
||||
for (int i = 0; i < fileCount; i++) {
|
||||
files.get(i).setTenantName("TENANT" + i);
|
||||
}
|
||||
|
||||
IAnonymousInterceptor interceptor = mock(IAnonymousInterceptor.class);
|
||||
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED, interceptor);
|
||||
|
||||
BulkImportJobJson job = new BulkImportJobJson();
|
||||
job.setProcessingMode(JobFileRowProcessingModeEnum.FHIR_TRANSACTION);
|
||||
job.setBatchSize(5);
|
||||
String jobId = mySvc.createNewJob(job, files);
|
||||
mySvc.markJobAsReadyForActivation(jobId);
|
||||
|
||||
boolean activateJobOutcome = mySvc.activateNextReadyJob();
|
||||
assertTrue(activateJobOutcome);
|
||||
|
||||
awaitAllBulkImportJobCompletion();
|
||||
|
||||
ArgumentCaptor<HookParams> paramsCaptor = ArgumentCaptor.forClass(HookParams.class);
|
||||
verify(interceptor, times(50)).invoke(any(), paramsCaptor.capture());
|
||||
List<String> tenantNames = paramsCaptor
|
||||
.getAllValues()
|
||||
.stream()
|
||||
.map(t -> t.get(RequestDetails.class).getTenantId())
|
||||
.distinct()
|
||||
.sorted()
|
||||
.collect(Collectors.toList());
|
||||
assertThat(tenantNames, containsInAnyOrder(
|
||||
"TENANT0", "TENANT1", "TENANT2", "TENANT3", "TENANT4", "TENANT5", "TENANT6", "TENANT7", "TENANT8", "TENANT9"
|
||||
));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private List<BulkImportJobFileJson> createInputFiles(int transactionsPerFile, int fileCount) {
|
||||
List<BulkImportJobFileJson> files = new ArrayList<>();
|
||||
|
@ -213,6 +220,7 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
|
|||
return files;
|
||||
}
|
||||
|
||||
@Order(3)
|
||||
@Test
|
||||
public void testJobsAreRegisteredWithJobRegistry() throws NoSuchJobException {
|
||||
Job job = myJobRegistry.getJob(BULK_IMPORT_JOB_NAME);
|
||||
|
|
Loading…
Reference in New Issue