From 6b3b954c7740078e965789c0acd7a7ff1655d58a Mon Sep 17 00:00:00 2001 From: TipzCM Date: Wed, 8 Feb 2023 16:54:25 -0500 Subject: [PATCH] =?UTF-8?q?trying=20to=20fix=20BulkDataExportTest=20testGr?= =?UTF-8?q?oupBulkExportNotInGroup=5FDoesNo=E2=80=A6=20(#4527)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * trying to fix BulkDataExportTest testGroupBulkExportNotInGroup_DoesNotShowUp * added change log --------- Co-authored-by: leif stawnyczy --- .../6_4_0/4526-fix-reduction-step.yaml | 7 +++++ .../WorkChannelMessageHandler.java | 26 ++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_4_0/4526-fix-reduction-step.yaml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_4_0/4526-fix-reduction-step.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_4_0/4526-fix-reduction-step.yaml new file mode 100644 index 00000000000..81ed230ca2c --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_4_0/4526-fix-reduction-step.yaml @@ -0,0 +1,7 @@ +--- +type: fix +issue: 4526 +title: "Fixing an issue where a long running reduction step causes + the message not to be processed fast enough, thereby allowing + multiple reduction step jobs to start. + " diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java index a8fa8a00d25..1920ef0a372 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java @@ -36,6 +36,7 @@ import ca.uhn.fhir.batch2.util.Batch2Constants; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.util.Logs; import org.apache.commons.lang3.Validate; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; @@ -43,6 +44,9 @@ import org.springframework.messaging.MessagingException; import javax.annotation.Nonnull; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; /** * This handler receives batch work request messages and performs the batch work requested by the message @@ -112,7 +116,27 @@ class WorkChannelMessageHandler implements MessageHandler { } JobStepExecutor stepExecutor = myJobStepExecutorFactory.newJobStepExecutor(instance, workChunk, cursor); - stepExecutor.executeStep(); + // TODO - ls + /* + * We should change this to actually have + * the reduction step take in smaller sets of + * lists of chunks from the previous steps (one + * at a time still) and compose the + * report gradually and in an idempotent way + */ + if (isReductionWorkNotification) { + // do async due to long running process + // we'll fire off a separate thread and let the job continue + ScheduledExecutorService exService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(@NotNull Runnable r) { + return new Thread(r, "Reduction-step-thread"); + } + }); + exService.execute(stepExecutor::executeStep); + } else { + stepExecutor.executeStep(); + } } private void markInProgressIfQueued(JobInstance theInstance) {