trying to fix BulkDataExportTest testGroupBulkExportNotInGroup_DoesNo… (#4527)

* trying to fix BulkDataExportTest testGroupBulkExportNotInGroup_DoesNotShowUp

* added change log

---------

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-mbp.home>
This commit is contained in:
TipzCM 2023-02-08 16:54:25 -05:00 committed by GitHub
parent 71ea1b49f2
commit 6b3b954c77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 1 deletions

View File

@ -0,0 +1,7 @@
---
type: fix
issue: 4526
title: "Fixing an issue where a long running reduction step causes
the message not to be processed fast enough, thereby allowing
multiple reduction step jobs to start.
"

View File

@ -36,6 +36,7 @@ import ca.uhn.fhir.batch2.util.Batch2Constants;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.Logs;
import org.apache.commons.lang3.Validate;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
@ -43,6 +44,9 @@ import org.springframework.messaging.MessagingException;
import javax.annotation.Nonnull;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
/**
* This handler receives batch work request messages and performs the batch work requested by the message
@ -112,7 +116,27 @@ class WorkChannelMessageHandler implements MessageHandler {
}
JobStepExecutor<?,?,?> stepExecutor = myJobStepExecutorFactory.newJobStepExecutor(instance, workChunk, cursor);
stepExecutor.executeStep();
// TODO - ls
/*
* We should change this to actually have
* the reduction step take in smaller sets of
* lists of chunks from the previous steps (one
* at a time still) and compose the
* report gradually and in an idempotent way
*/
if (isReductionWorkNotification) {
// do async due to long running process
// we'll fire off a separate thread and let the job continue
ScheduledExecutorService exService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(@NotNull Runnable r) {
return new Thread(r, "Reduction-step-thread");
}
});
exService.execute(stepExecutor::executeStep);
} else {
stepExecutor.executeStep();
}
}
private void markInProgressIfQueued(JobInstance theInstance) {