This commit is contained in:
leif stawnyczy 2024-03-14 14:44:13 -04:00
parent 480052581a
commit 3c51682452
10 changed files with 67 additions and 73 deletions

View File

@ -127,6 +127,8 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
ourLog.trace(
"Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity));
System.out.println("==========");
System.out.println("Creating workchunk " + entity.getId() + " for " + entity.getTargetStepId() + " in " + entity.getStatus().name());
return entity.getId();
}
@ -357,13 +359,17 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
myTransactionService
.withSystemRequestOnDefaultPartition()
.execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
.execute(() -> {
System.out.println("xxxxxxxxxxx");
System.out.println("onWorkChunkCompletion " + theEvent.getChunkId() + " " + WorkChunkStatusEnum.COMPLETED.name());
myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
theEvent.getChunkId(),
new Date(),
theEvent.getRecordsProcessed(),
theEvent.getRecoveredErrorCount(),
WorkChunkStatusEnum.COMPLETED,
theEvent.getRecoveredWarningMessage()));
theEvent.getRecoveredWarningMessage());
});
}
@Nullable

View File

@ -300,7 +300,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
myBatch2JobHelper.awaitGatedStepId(SECOND_STEP_ID, instanceId);
// wait for last step to finish
ourLog.info("Setting last step latch");
@ -391,7 +391,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
myBatch2JobHelper.awaitGatedStepId(SECOND_STEP_ID, instanceId);
// wait for last step to finish
ourLog.info("Setting last step latch");

View File

@ -1079,8 +1079,6 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
for (Map.Entry<String, List<String>> file : results.getResourceTypeToBinaryIds().entrySet()) {
String resourceType = file.getKey();
List<String> binaryIds = file.getValue();
System.out.println("xxxxxxxxxxxxxxxx");
System.out.println(resourceType + " binary with ids " + String.join(", ", binaryIds));
for (var nextBinaryId : binaryIds) {
String nextBinaryIdPart = new IdType(nextBinaryId).getIdPart();
assertThat(nextBinaryIdPart, matchesPattern("[a-zA-Z0-9]{32}"));
@ -1092,8 +1090,6 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
try (var iter = new LineIterator(new StringReader(nextNdJsonFileContent))) {
iter.forEachRemaining(t -> {
if (isNotBlank(t)) {
System.out.println("xxxxxxx");
System.out.println(t);
IBaseResource next = myFhirContext.newJsonParser().parseResource(t);
IIdType nextId = next.getIdElement().toUnqualifiedVersionless();
if (!resourceType.equals(nextId.getResourceType())) {

View File

@ -56,13 +56,13 @@ public class PatientReindexTestHelper {
public static Stream<Arguments> numResourcesParams(){
return Stream.of(
Arguments.of(0),
Arguments.of(1),
Arguments.of(499),
Arguments.of(500),
Arguments.of(750),
Arguments.of(1000),
Arguments.of(1001)
// Arguments.of(0),
Arguments.of(1)//,
// Arguments.of(499),
// Arguments.of(500),
// Arguments.of(750),
// Arguments.of(1000),
// Arguments.of(1001)
);
}

View File

@ -77,8 +77,6 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
String dataValueString = JsonUtil.serialize(dataValue, false);
// once finished, create workchunks in READY state
// the JobMaintenanceServiceImpl will transition these to
// QUEUED when necessary
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(
myJobDefinitionId, myJobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
String chunkId = myHapiTransactionService

View File

@ -43,6 +43,7 @@ import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class JobInstanceProcessor {
@ -99,7 +100,7 @@ public class JobInstanceProcessor {
JobDefinition<? extends IModelJson> jobDefinition =
myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance);
enqueueReadyChunks(theInstance, jobDefinition);
enqueueReadyChunks(theInstance, jobDefinition, false);
cleanupInstance(theInstance);
triggerGatedExecutions(theInstance, jobDefinition);
@ -173,7 +174,8 @@ public class JobInstanceProcessor {
}
private void triggerGatedExecutions(JobInstance theInstance, JobDefinition<?> theJobDefinition) {
if (!theInstance.isRunning()) {
// QUEUE'd jobs that are gated need to start; this step will do that
if (!theInstance.isRunning() && (theInstance.getStatus() != StatusEnum.QUEUED && theJobDefinition.isGatedExecution())) {
ourLog.debug(
"JobInstance {} is not in a \"running\" state. Status {}",
theInstance.getInstanceId(),
@ -188,22 +190,20 @@ public class JobInstanceProcessor {
JobWorkCursor<?, ?, ?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
theJobDefinition, theInstance.getCurrentGatedStepId());
// final step
if (jobWorkCursor.isFinalStep() && !jobWorkCursor.isReductionStep()) {
ourLog.debug("Job instance {} is in final step and it's not a reducer step", theInstance.getInstanceId());
return;
}
String instanceId = theInstance.getInstanceId();
String currentStepId = jobWorkCursor.getCurrentStepId();
boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, currentStepId);
if (canAdvance) {
// current step is the reduction step (a final step)
if (jobWorkCursor.isReductionStep()) {
// current step is the reduction step (all reduction steps are final)
JobWorkCursor<?, ?, ?> nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
jobWorkCursor.getJobDefinition(), jobWorkCursor.getCurrentStepId());
myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor);
} else if (jobWorkCursor.isFinalStep()) {
// current step is the final step in a non-reduction gated job
processChunksForNextGatedSteps(theInstance, theJobDefinition, jobWorkCursor, jobWorkCursor.getCurrentStepId());
} else {
// all other gated job steps
String nextStepId = jobWorkCursor.nextStep.getStepId();
ourLog.info(
"All processing is complete for gated execution of instance {} step {}. Proceeding to step {}",
@ -212,7 +212,7 @@ public class JobInstanceProcessor {
nextStepId);
// otherwise, continue processing as expected
processChunksForNextGatedSteps(theInstance, theJobDefinition, nextStepId);
processChunksForNextGatedSteps(theInstance, theJobDefinition, jobWorkCursor, nextStepId);
}
} else {
ourLog.debug(
@ -238,10 +238,11 @@ public class JobInstanceProcessor {
return true;
}
// if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
// // all work chunks complete -> go to next step
// return true;
// }
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
// all previous workchunks are complete;
// none in READY though -> still proceed
return true;
}
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.READY))) {
// all workchunks ready -> proceed
@ -262,20 +263,20 @@ public class JobInstanceProcessor {
* we'd need a new GATE_WAITING state to move chunks to to prevent jobs from
* completing prematurely.
*/
private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition<?> theJobDefinition) {
private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition<?> theJobDefinition, boolean theIsGatedExecutionAdvancementBool) {
// we need a transaction to access the stream of workchunks
// because workchunks are created in READY state, there's an unknown
// number of them (and so we could be reading many from the db)
TransactionStatus status = myTransactionManager.getTransaction(new DefaultTransactionDefinition());
Stream<WorkChunk> readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates(
theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY));
AtomicInteger counter = new AtomicInteger();
readyChunks.forEach(chunk -> {
JobWorkCursor<?, ?, ?> jobWorkCursor =
JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, chunk.getTargetStepId());
if (theJobDefinition.isGatedExecution()
|| jobWorkCursor.isReductionStep()) {
counter.getAndIncrement();
if (!theIsGatedExecutionAdvancementBool && (theJobDefinition.isGatedExecution() || jobWorkCursor.isReductionStep())) {
/*
* Gated executions are queued later when all work chunks are ready.
*
@ -297,6 +298,8 @@ public class JobInstanceProcessor {
});
myTransactionManager.commit(status);
ourLog.debug("Encountered {} READY work chunks for job {}", counter.get(), theJobDefinition.getJobDefinitionId());
}
/**
@ -312,7 +315,7 @@ public class JobInstanceProcessor {
WorkChunk theChunk, JobInstance theInstance, JobDefinition<?> theJobDefinition) {
String chunkId = theChunk.getId();
myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> {
ourLog.debug("Updated {} workchunk with id {}", updated, chunkId);
ourLog.info("Updated {} workchunk with id {}", updated, chunkId);
if (updated == 1) {
// send to the queue
// we use current step id because it has not been moved to the next step (yet)
@ -336,7 +339,7 @@ public class JobInstanceProcessor {
});
}
private void processChunksForNextGatedSteps(JobInstance theInstance, JobDefinition<?> theJobDefinition, String nextStepId) {
private void processChunksForNextGatedSteps(JobInstance theInstance, JobDefinition<?> theJobDefinition, JobWorkCursor<?, ?, ?> theWorkCursor, String nextStepId) {
String instanceId = theInstance.getInstanceId();
List<String> readyChunksForNextStep =
myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.READY);
@ -350,13 +353,10 @@ public class JobInstanceProcessor {
readyChunksForNextStep.size());
}
// Note on sequence: we don't have XA transactions, and are talking to two stores (JPA + Queue)
// Sequence: 1 - So we run the query to minimize the work overlapping.
// List<String> chunksToSubmit =
// myJobPersistence.fetchAllChunkIdsForStepWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.READY);
// Sequence: 2 - update the job step so the workers will process them.
boolean changed = myJobPersistence.updateInstance(instanceId, instance -> {
// update the job step so the workers will process them.
// if it's the last (gated) step, there will be no change - but we should
// queue up the chunks anyways
boolean changed = theWorkCursor.isFinalStep() || myJobPersistence.updateInstance(instanceId, instance -> {
if (instance.getCurrentGatedStepId().equals(nextStepId)) {
// someone else beat us here. No changes
return false;
@ -365,27 +365,14 @@ public class JobInstanceProcessor {
return true;
});
if (!changed) {
// we collided with another maintenance job.
ourLog.warn("Skipping gate advance to {} for instance {} - already advanced.", nextStepId, instanceId);
return;
}
enqueueReadyChunks(theInstance, theJobDefinition);
// DESIGN GAP: if we die here, these chunks will never be queued.
// Need a WAITING stage before QUEUED for chunks, so we can catch them.
// Sequence: 3 - send the notifications
// for (String nextChunkId : chunksToSubmit) {
// JobWorkNotification workNotification = new JobWorkNotification(theInstance, nextStepId, nextChunkId);
// myBatchJobSender.sendWorkChannelMessage(workNotification);
// }
// ourLog.debug(
// "Submitted a batch of chunks for processing. [chunkCount={}, instanceId={}, stepId={}]",
// chunksToSubmit.size(),
// instanceId,
// nextStepId);
// because we now have all gated job chunks in READY state,
// we can enqueue them
enqueueReadyChunks(theInstance, theJobDefinition, true);
}
}

View File

@ -98,6 +98,7 @@ public class JobInstanceProgressCalculator {
while (workChunkIterator.hasNext()) {
WorkChunk next = workChunkIterator.next();
// global stats
myProgressAccumulator.addChunk(next);
// instance stats

View File

@ -35,6 +35,8 @@ import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -111,14 +113,7 @@ class JobDataSinkTest {
// theDataSink.accept(output) called by firstStepWorker above calls two services. Let's validate them both.
verify(myBatchJobSender).sendWorkChannelMessage(myJobWorkNotificationCaptor.capture());
JobWorkNotification notification = myJobWorkNotificationCaptor.getValue();
assertEquals(JOB_DEF_ID, notification.getJobDefinitionId());
assertEquals(JOB_INSTANCE_ID, notification.getInstanceId());
assertEquals(CHUNK_ID, notification.getChunkId());
assertEquals(JOB_DEF_VERSION, notification.getJobDefinitionVersion());
assertEquals(LAST_STEP_ID, notification.getTargetStepId());
verify(myBatchJobSender, never()).sendWorkChannelMessage(any());
WorkChunkCreateEvent batchWorkChunk = myBatchWorkChunkCaptor.getValue();
assertEquals(JOB_DEF_VERSION, batchWorkChunk.jobDefinitionVersion);

View File

@ -488,7 +488,7 @@ public class WorkChunkProcessorTest {
WorkChunk chunk = new WorkChunk();
chunk.setInstanceId(INSTANCE_ID);
chunk.setId(theId);
chunk.setStatus(WorkChunkStatusEnum.QUEUED);
chunk.setStatus(WorkChunkStatusEnum.READY);
chunk.setData(JsonUtil.serialize(
new StepInputData()
));

View File

@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
@ -239,7 +240,6 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
public void testInProgress_GatedExecution_FirstStepComplete() {
// Setup
List<WorkChunk> chunks = Arrays.asList(
JobCoordinatorImplTest.createWorkChunkStep1().setStatus(WorkChunkStatusEnum.COMPLETED).setId(CHUNK_ID + "abc"),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY).setId(CHUNK_ID),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY).setId(CHUNK_ID_2)
);
@ -255,7 +255,10 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance1));
when(myJobPersistence.fetchAllWorkChunksForJobInStates(anyString(), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(chunks.stream().filter(c -> c.getStatus() == WorkChunkStatusEnum.READY));
.thenAnswer((args) -> {
// new stream every time
return new ArrayList<>(chunks).stream();
});
doAnswer(a -> {
Consumer<Integer> callback = a.getArgument(1);
callback.accept(1);
@ -268,7 +271,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
// Verify
verify(myWorkChannelProducer, times(2)).send(myMessageCaptor.capture());
verify(myJobPersistence, times(1)).updateInstance(eq(INSTANCE_ID), any());
verify(myJobPersistence, times(2)).updateInstance(eq(INSTANCE_ID), any());
verifyNoMoreInteractions(myJobPersistence);
JobWorkNotification payload0 = myMessageCaptor.getAllValues().get(0).getPayload();
assertEquals(STEP_2, payload0.getTargetStepId());
@ -390,7 +393,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenAnswer(t -> theChunks.stream());
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenAnswer(t -> theChunks.stream().map(c -> c.setStatus(WorkChunkStatusEnum.QUEUED)).toList().iterator());
.thenAnswer(t -> theChunks.stream().map(c -> c.setStatus(WorkChunkStatusEnum.READY)).toList().iterator());
// test
mySvc.runMaintenancePass();
@ -454,6 +457,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY),
createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY)
);
JobInstance instance = createInstance();
instance.setCurrentGatedStepId(STEP_2);
myLogCapture.setUp(Level.ERROR);
@ -463,6 +468,12 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
consumer.accept(0); // nothing processed
return 1;
}).when(myJobPersistence).enqueueWorkChunkForProcessing(anyString(), any());
doAnswer(args -> {
IJobPersistence.JobInstanceUpdateCallback callback = args.getArgument(1);
callback.doUpdate(instance);
return true;
}).when(myJobPersistence).updateInstance(any(), any());
// test
runEnqueueReadyChunksTest(chunks, createJobDefinitionWithReduction());