updating tests

This commit is contained in:
leif stawnyczy 2024-03-13 14:32:51 -04:00
parent 88a916ece5
commit 480052581a
15 changed files with 218 additions and 143 deletions

View File

@ -54,7 +54,6 @@ import jakarta.persistence.LockModeType;
import jakarta.persistence.Query;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.Validate;
import org.hibernate.Session;
import org.slf4j.Logger;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
@ -65,7 +64,6 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.sql.Connection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
@ -297,9 +295,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
public void enqueueWorkChunkForProcessing(String theChunkId, Consumer<Integer> theCallback) {
int updated = myWorkChunkRepository.updateChunkStatus(
theChunkId, WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.READY);
if (updated == 1) {
theCallback.accept(updated);
}
theCallback.accept(updated);
}
@Override

View File

@ -2215,28 +2215,28 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
p.addName().addFamily(methodName);
IIdType id1 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
sleepUntilTimeChanges();
sleep1MS();
p = new Patient();
p.addIdentifier().setSystem("urn:system2").setValue(methodName);
p.addName().addFamily(methodName);
IIdType id2 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
sleepUntilTimeChanges();
sleep1MS();
p = new Patient();
p.addIdentifier().setSystem("urn:system3").setValue(methodName);
p.addName().addFamily(methodName);
IIdType id3 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
sleepUntilTimeChanges();
sleep1MS();
p = new Patient();
p.addIdentifier().setSystem("urn:system4").setValue(methodName);
p.addName().addFamily(methodName);
IIdType id4 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
sleepUntilTimeChanges();
sleep1MS();
SearchParameterMap pm;
List<IIdType> actual;

View File

@ -11,6 +11,7 @@ import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
@ -23,6 +24,10 @@ import ca.uhn.fhir.jpa.test.config.Batch2FastSchedulerConfig;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.test.utilities.UnregisterScheduledProcessor;
import ca.uhn.fhir.testjob.TestJobDefinitionUtils;
import ca.uhn.fhir.testjob.models.FirstStepOutput;
import ca.uhn.fhir.testjob.models.ReductionStepOutput;
import ca.uhn.fhir.testjob.models.TestJobParameters;
import ca.uhn.test.concurrency.PointcutLatch;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
@ -43,9 +48,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* The on-enter actions are defined in
* {@link ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater#handleStatusChange}
* {@link ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater#handleStatusChange(JobInstance)}}
* {@link ca.uhn.fhir.batch2.progress.InstanceProgress#updateStatus(JobInstance)}
* {@link JobInstanceProcessor#cleanupInstance()}
* {@link ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor#cleanupInstance()}
* For chunks:
* {@link ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl#onWorkChunkCreate}
@ -59,9 +64,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(Batch2JobMaintenanceIT.class);
public static final int TEST_JOB_VERSION = 1;
public static final String FIRST_STEP_ID = "first-step";
public static final String LAST_STEP_ID = "last-step";
@Autowired
JobDefinitionRegistry myJobDefinitionRegistry;
@Autowired
@ -125,6 +127,7 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
String batchJobId = myJobCoordinator.startInstance(new SystemRequestDetails(), request).getInstanceId();
myBatch2JobHelper.awaitJobHasStatus(batchJobId, StatusEnum.QUEUED);
myFirstStepLatch.awaitExpected();
@ -159,12 +162,12 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
public void testFirstStepToSecondStepFasttrackingDisabled_singleChunkDoesNotFasttrack() throws InterruptedException {
myStorageSettings.setJobFastTrackingEnabled(false);
IJobStepWorker<Batch2JobMaintenanceIT.TestJobParameters, VoidModel, Batch2JobMaintenanceIT.FirstStepOutput> firstStep = (step, sink) -> {
sink.accept(new Batch2JobMaintenanceIT.FirstStepOutput());
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
sink.accept(new FirstStepOutput());
callLatch(myFirstStepLatch, step);
return RunOutcome.SUCCESS;
};
IJobStepWorker<Batch2JobMaintenanceIT.TestJobParameters, Batch2JobMaintenanceIT.FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
String jobDefId = new Exception().getStackTrace()[0].getMethodName();
@ -176,7 +179,7 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
String batchJobId = myJobCoordinator.startInstance(request).getInstanceId();
String batchJobId = myJobCoordinator.startInstance(new SystemRequestDetails(), request).getInstanceId();
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.assertFastTracking(batchJobId);
@ -203,54 +206,19 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
@Nonnull
private JobDefinition<? extends IModelJson> buildGatedJobDefinition(String theJobId, IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep, IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> theLastStep) {
return JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
theFirstStep
)
.addLastStep(
LAST_STEP_ID,
"Test last step",
theLastStep
)
.completionHandler(myCompletionHandler)
.build();
return TestJobDefinitionUtils.buildGatedJobDefinition(
theJobId,
theFirstStep,
theLastStep,
myCompletionHandler
);
}
static class TestJobParameters implements IModelJson {
TestJobParameters() {
}
}
static class FirstStepOutput implements IModelJson {
FirstStepOutput() {
}
}
static class SecondStepOutput implements IModelJson {
@JsonProperty("test")
private String myTestValue;
SecondStepOutput() {
}
public void setValue(String theV) {
myTestValue = theV;
}
}
static class ReductionStepOutput implements IModelJson {
static class OurReductionStepOutput extends ReductionStepOutput {
@JsonProperty("result")
private List<?> myResult;
ReductionStepOutput(List<?> theResult) {
OurReductionStepOutput(List<?> theResult) {
myResult = theResult;
}
}

View File

@ -2,7 +2,10 @@ package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.jobs.imprt.NdJsonFileJson;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
@ -19,10 +22,13 @@ import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.testjob.TestJobDefinitionUtils;
import ca.uhn.fhir.testjob.models.FirstStepOutput;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import jakarta.annotation.Nonnull;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
@ -36,7 +42,6 @@ import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.transaction.PlatformTransactionManager;
import jakarta.annotation.Nonnull;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@ -64,9 +69,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
public static final String JOB_DEFINITION_ID = "definition-id";
public static final String TARGET_STEP_ID = "step-id";
public static final String TARGET_STEP_ID = TestJobDefinitionUtils.FIRST_STEP_ID;
public static final String DEF_CHUNK_ID = "definition-chunkId";
public static final String STEP_CHUNK_ID = "step-chunkId";
public static final String STEP_CHUNK_ID = TestJobDefinitionUtils.FIRST_STEP_ID;
public static final int JOB_DEF_VER = 1;
public static final int SEQUENCE_NUMBER = 1;
public static final String CHUNK_DATA = "{\"key\":\"value\"}";
@ -81,6 +86,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Autowired
public Batch2JobHelper myBatch2JobHelper;
@Autowired
public JobDefinitionRegistry myJobDefinitionRegistry;
@Test
public void testDeleteInstance() {
// Setup
@ -104,7 +112,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}
private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData);
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, TestJobDefinitionUtils.TEST_JOB_VERSION, theTargetStepId, theInstanceId, theSequence, theSerializedData);
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
@ -350,7 +358,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
Date updateTime = runInTransaction(() -> new Date(myJobInstanceRepository.findById(instanceId).orElseThrow().getUpdateTime().getTime()));
sleepUntilTimeChanges();
sleep1MS();
// Test
runInTransaction(() -> mySvc.updateInstanceUpdateTime(instanceId));
@ -367,11 +375,15 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testStoreAndFetchWorkChunk_NoData() {
JobInstance instance = createInstance();
JobInstance instance = createInstance(true);
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertNull(chunk.getData());
}
@ -410,7 +422,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(instanceId, workChunk.getInstanceId());
assertEquals(TARGET_STEP_ID, workChunk.getTargetStepId());
assertEquals(0, workChunk.getSequence());
assertEquals(WorkChunkStatusEnum.QUEUED, workChunk.getStatus());
assertEquals(WorkChunkStatusEnum.READY, workChunk.getStatus());
assertNotNull(workChunk.getCreateTime());
@ -445,13 +457,14 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testStoreAndFetchWorkChunk_WithData() {
JobInstance instance = createInstance();
JobInstance instance = createInstance(true);
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
assertNotNull(id);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertEquals(36, chunk.getInstanceId().length());
assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId());
@ -464,14 +477,14 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testMarkChunkAsCompleted_Success() {
JobInstance instance = createInstance();
JobInstance instance = createInstance(true);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
@ -483,7 +496,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertNotNull(chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
sleep1MS();
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0));
runInTransaction(() -> {
@ -535,20 +548,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testMarkChunkAsCompleted_Error() {
JobInstance instance = createInstance();
JobInstance instance = createInstance(true);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
sleep1MS();
WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message");
mySvc.onWorkChunkError(request);
@ -587,20 +600,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Test
public void testMarkChunkAsCompleted_Fail() {
JobInstance instance = createInstance();
JobInstance instance = createInstance(true);
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
sleep1MS();
mySvc.onWorkChunkFailed(chunkId, "This is an error message");
runInTransaction(() -> {
@ -675,15 +688,38 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
.orElseThrow(IllegalArgumentException::new));
}
private JobInstance createInstance() {
return createInstance(false);
}
@Nonnull
private JobInstance createInstance() {
private JobInstance createInstance(boolean theCreateJobDefBool) {
JobInstance instance = new JobInstance();
instance.setJobDefinitionId(JOB_DEFINITION_ID);
instance.setStatus(StatusEnum.QUEUED);
instance.setJobDefinitionVersion(JOB_DEF_VER);
instance.setParameters(CHUNK_DATA);
instance.setReport("TEST");
if (theCreateJobDefBool) {
JobDefinition<?> jobDef = TestJobDefinitionUtils.buildGatedJobDefinition(
JOB_DEFINITION_ID,
(step, sink) -> {
sink.accept(new FirstStepOutput());
return RunOutcome.SUCCESS;
},
(step, sink) -> {
return RunOutcome.SUCCESS;
},
theDetails -> {
}
);
if (myJobDefinitionRegistry.getJobDefinition(jobDef.getJobDefinitionId(), jobDef.getJobDefinitionVersion()).isEmpty()) {
myJobDefinitionRegistry.addJobDefinition(jobDef);
}
}
return instance;
}

View File

@ -881,6 +881,7 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
options.setResourceTypes(Sets.newHashSet("Patient", "Observation", "CarePlan", "MedicationAdministration", "ServiceRequest"));
options.setExportStyle(BulkExportJobParameters.ExportStyle.PATIENT);
options.setOutputFormat(Constants.CT_FHIR_NDJSON);
verifyBulkExportResults(options, List.of("Patient/P1", carePlanId, medAdminId, sevReqId, obsSubId, obsPerId), Collections.emptyList());
}
@ -1078,8 +1079,9 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
for (Map.Entry<String, List<String>> file : results.getResourceTypeToBinaryIds().entrySet()) {
String resourceType = file.getKey();
List<String> binaryIds = file.getValue();
System.out.println("xxxxxxxxxxxxxxxx");
System.out.println(resourceType + " binary with ids " + String.join(", ", binaryIds));
for (var nextBinaryId : binaryIds) {
String nextBinaryIdPart = new IdType(nextBinaryId).getIdPart();
assertThat(nextBinaryIdPart, matchesPattern("[a-zA-Z0-9]{32}"));
@ -1090,6 +1092,8 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
try (var iter = new LineIterator(new StringReader(nextNdJsonFileContent))) {
iter.forEachRemaining(t -> {
if (isNotBlank(t)) {
System.out.println("xxxxxxx");
System.out.println(t);
IBaseResource next = myFhirContext.newJsonParser().parseResource(t);
IIdType nextId = next.getIdElement().toUnqualifiedVersionless();
if (!resourceType.equals(nextId.getResourceType())) {

View File

@ -93,7 +93,6 @@ import org.springframework.transaction.support.TransactionTemplate;
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@ -582,13 +581,13 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
p.addName().setFamily("family");
final IIdType id = myPatientDao.create(p, mySrd).getId().toUnqualified();
sleepUntilTimeChanges();
sleep1MS();
ValueSet vs = new ValueSet();
vs.setUrl("http://foo");
myValueSetDao.create(vs, mySrd);
sleepUntilTimeChanges();
sleep1MS();
ResourceTable entity = new TransactionTemplate(myTxManager).execute(t -> myEntityManager.find(ResourceTable.class, id.getIdPartAsLong()));
assertEquals(Long.valueOf(1), entity.getIndexStatus());

View File

@ -358,7 +358,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.FINAL);
}
sleepUntilTimeChanges();
sleep1MS();
myReindexTestHelper.createAlleleSearchParameter();
mySearchParamRegistry.forceRefresh();

View File

@ -30,22 +30,22 @@ public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
// Setup
createPatient(withActiveFalse());
sleepUntilTimeChanges();
sleep1MS();
Date start = new Date();
Long id0 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChanges();
sleep1MS();
Long id1 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChanges();
sleep1MS();
Date beforeLastInRange = new Date();
sleepUntilTimeChanges();
sleep1MS();
Long id2 = createObservation(withObservationCode("http://foo", "bar")).getIdPartAsLong();
sleepUntilTimeChanges();
sleep1MS();
Date end = new Date();
sleepUntilTimeChanges();
sleep1MS();
createPatient(withActiveFalse());
@ -103,26 +103,26 @@ public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
// Setup
final Long patientId0 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChanges();
sleep1MS();
// Start of resources within range
Date start = new Date();
sleepUntilTimeChanges();
sleep1MS();
Long patientId1 = createPatient(withActiveFalse()).getIdPartAsLong();
createObservation(withObservationCode("http://foo", "bar"));
createObservation(withObservationCode("http://foo", "bar"));
sleepUntilTimeChanges();
sleep1MS();
Date beforeLastInRange = new Date();
sleepUntilTimeChanges();
sleep1MS();
Long patientId2 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChanges();
sleep1MS();
Date end = new Date();
sleepUntilTimeChanges();
sleep1MS();
// End of resources within range
createObservation(withObservationCode("http://foo", "bar"));
final Long patientId3 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChanges();
sleep1MS();
// Execute

View File

@ -0,0 +1,50 @@
package ca.uhn.fhir.testjob;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.testjob.models.FirstStepOutput;
import ca.uhn.fhir.testjob.models.TestJobParameters;
public class TestJobDefinitionUtils {
public static final int TEST_JOB_VERSION = 1;
public static final String FIRST_STEP_ID = "first-step";
public static final String LAST_STEP_ID = "last-step";
/**
* Creates a test job definition
* @param theJobId
* @param theFirstStep
* @param theLastStep
* @param theCompletionHandler
* @return
*/
public static JobDefinition<? extends IModelJson> buildGatedJobDefinition(
String theJobId,
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep,
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> theLastStep,
IJobCompletionHandler<TestJobParameters> theCompletionHandler) {
return JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
theFirstStep
)
.addLastStep(
LAST_STEP_ID,
"Test last step",
theLastStep
)
.completionHandler(theCompletionHandler)
.build();
}
}

View File

@ -0,0 +1,6 @@
package ca.uhn.fhir.testjob.models;
import ca.uhn.fhir.model.api.IModelJson;
public class FirstStepOutput implements IModelJson {
}

View File

@ -0,0 +1,6 @@
package ca.uhn.fhir.testjob.models;
import ca.uhn.fhir.model.api.IModelJson;
public class ReductionStepOutput implements IModelJson {
}

View File

@ -0,0 +1,6 @@
package ca.uhn.fhir.testjob.models;
import ca.uhn.fhir.model.api.IModelJson;
public class TestJobParameters implements IModelJson {
}

View File

@ -609,7 +609,7 @@ public abstract class BaseJpaTest extends BaseTest {
/**
* Sleep until at least 1 ms has elapsed
*/
public void sleepUntilTimeChanges() {
public void sleep1MS() {
StopWatch sw = new StopWatch();
await().until(() -> sw.getMillis() > 0);
}

View File

@ -32,8 +32,6 @@ import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
@ -41,11 +39,9 @@ import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
@ -216,7 +212,7 @@ public class JobInstanceProcessor {
nextStepId);
// otherwise, continue processing as expected
processChunksForNextSteps(theInstance, nextStepId);
processChunksForNextGatedSteps(theInstance, theJobDefinition, nextStepId);
}
} else {
ourLog.debug(
@ -242,15 +238,13 @@ public class JobInstanceProcessor {
return true;
}
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
// all work chunks complete -> go to next step
return true;
}
// if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
// // all work chunks complete -> go to next step
// return true;
// }
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.READY))
&& theJobDefinition.getStepById(theStepId).isReductionStep()) {
// all workchunks ready && last step is reduction step;
// proceed
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.READY))) {
// all workchunks ready -> proceed
return true;
}
@ -281,12 +275,14 @@ public class JobInstanceProcessor {
JobWorkCursor<?, ?, ?> jobWorkCursor =
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, chunk.getTargetStepId());
if (theJobDefinition.isGatedExecution()
&& jobWorkCursor.isFinalStep()
&& jobWorkCursor.isReductionStep()) {
// reduction steps are processed by
// ReductionStepExecutorServiceImpl
// which does not wait for steps off the queue.
// so we will not process them here
|| jobWorkCursor.isReductionStep()) {
/*
* Gated executions are queued later when all work chunks are ready.
*
* Reduction steps are not submitted to the queue at all, but processed inline.
* Currently all reduction steps are also gated, but this might not always
* be true.
*/
return;
}
@ -316,6 +312,7 @@ public class JobInstanceProcessor {
WorkChunk theChunk, JobInstance theInstance, JobDefinition<?> theJobDefinition) {
String chunkId = theChunk.getId();
myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> {
ourLog.debug("Updated {} workchunk with id {}", updated, chunkId);
if (updated == 1) {
// send to the queue
// we use current step id because it has not been moved to the next step (yet)
@ -331,30 +328,33 @@ public class JobInstanceProcessor {
// we'll log and skip it. If it's still in the DB, the next pass
// will pick it up. Otherwise, it's no longer important
ourLog.error(
"Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; skipping work chunk.",
"Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; found {}, expected 1; skipping work chunk.",
theInstance.getInstanceId(),
theChunk.getId());
theChunk.getId(),
updated);
}
});
}
private void processChunksForNextSteps(JobInstance theInstance, String nextStepId) {
private void processChunksForNextGatedSteps(JobInstance theInstance, JobDefinition<?> theJobDefinition, String nextStepId) {
String instanceId = theInstance.getInstanceId();
List<String> queuedChunksForNextStep =
myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.QUEUED);
List<String> readyChunksForNextStep =
myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.READY);
int totalChunksForNextStep = myProgressAccumulator.getTotalChunkCountForInstanceAndStep(instanceId, nextStepId);
if (totalChunksForNextStep != queuedChunksForNextStep.size()) {
if (totalChunksForNextStep != readyChunksForNextStep.size()) {
ourLog.debug(
"Total ProgressAccumulator QUEUED chunk count does not match QUEUED chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]",
"Total ProgressAccumulator READY chunk count does not match READY chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]",
instanceId,
nextStepId,
totalChunksForNextStep,
queuedChunksForNextStep.size());
readyChunksForNextStep.size());
}
// Note on sequence: we don't have XA transactions, and are talking to two stores (JPA + Queue)
// Sequence: 1 - So we run the query to minimize the work overlapping.
List<String> chunksToSubmit =
myJobPersistence.fetchAllChunkIdsForStepWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.QUEUED);
// List<String> chunksToSubmit =
// myJobPersistence.fetchAllChunkIdsForStepWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.READY);
// Sequence: 2 - update the job step so the workers will process them.
boolean changed = myJobPersistence.updateInstance(instanceId, instance -> {
if (instance.getCurrentGatedStepId().equals(nextStepId)) {
@ -364,24 +364,28 @@ public class JobInstanceProcessor {
instance.setCurrentGatedStepId(nextStepId);
return true;
});
if (!changed) {
// we collided with another maintenance job.
ourLog.warn("Skipping gate advance to {} for instance {} - already advanced.", nextStepId, instanceId);
return;
}
enqueueReadyChunks(theInstance, theJobDefinition);
// DESIGN GAP: if we die here, these chunks will never be queued.
// Need a WAITING stage before QUEUED for chunks, so we can catch them.
// Sequence: 3 - send the notifications
for (String nextChunkId : chunksToSubmit) {
JobWorkNotification workNotification = new JobWorkNotification(theInstance, nextStepId, nextChunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
ourLog.debug(
"Submitted a batch of chunks for processing. [chunkCount={}, instanceId={}, stepId={}]",
chunksToSubmit.size(),
instanceId,
nextStepId);
// for (String nextChunkId : chunksToSubmit) {
// JobWorkNotification workNotification = new JobWorkNotification(theInstance, nextStepId, nextChunkId);
// myBatchJobSender.sendWorkChannelMessage(workNotification);
// }
// ourLog.debug(
// "Submitted a batch of chunks for processing. [chunkCount={}, instanceId={}, stepId={}]",
// chunksToSubmit.size(),
// instanceId,
// nextStepId);
}
}

View File

@ -184,7 +184,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
.thenReturn(Arrays.asList(existingInProgInstance));
// test
Batch2JobStartResponse startResponse = mySvc.startInstance(startRequest);
Batch2JobStartResponse startResponse = mySvc.startInstance(new SystemRequestDetails(), startRequest);
// verify
assertEquals(inProgressInstanceId, startResponse.getInstanceId()); // make sure it's the completed one