Bulk Export Bug With Many Resources and Low Max File Size (#4506)

* failing test

* fix + changelog

* tweak

* add method to IJobPersistence to use a Stream

* tweak

* tweak

* decrease test time

* clean up

* code review comments

* version bump

* Increase timeout limit to match BulkExportUseCaseTest

* shorten test

* maintenance pass

* add logging

* Revert "add logging"

This reverts commit b0453fd9538d6858e6bd212f402d4b04c6306136.

* Revert "maintenance pass"

This reverts commit bbc7418d519260c4b9827546a0bf822db1c78b1b.

* test

* trying to fix BulkDataExportTest testGroupBulkExportNotInGroup_DoesNotShowUp

* shorten tests

* logging

* move test location

* fixes a regression caused my change in hapi-fhir

* timeout

* Revert "fixes a regression caused my change in hapi-fhir"

This reverts commit 4b58013149836eededc568d295c5baf8fb3df989.

* testing

* Revert "testing"

This reverts commit aafc95c2f36bf89ae935c6c342017ec300fe4289.

---------

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-mbp.home>
This commit is contained in:
Nathan Doef 2023-02-10 13:14:29 -05:00 committed by GitHub
parent fb0512f78f
commit 9754a72046
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
89 changed files with 352 additions and 176 deletions

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,14 +4,14 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-bom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<packaging>pom</packaging>
<name>HAPI FHIR BOM</name>
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-cli</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 4511
jira: SMILE-6064
title: "Previously, bulk export jobs were getting stuck in the `FINALIZE` state when performed
with many resources and a low Bulk Export File Maximum Capacity. This has been fixed."

View File

@ -11,7 +11,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -29,13 +29,13 @@ import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
import ca.uhn.fhir.model.api.PagingIterator;
import ca.uhn.fhir.util.Logs;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
@ -59,6 +59,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -332,13 +333,21 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
/**
* Deprecated, use {@link ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl#fetchAllWorkChunksForStepStream(String, String)}
* Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
*/
@Override
@Deprecated
public Iterator<WorkChunk> fetchAllWorkChunksForStepIterator(String theInstanceId, String theStepId) {
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunksForStep(theInstanceId, theStepId, theBatchSize, thePageIndex, theConsumer));
}
@Override
@Transactional(propagation = Propagation.MANDATORY)
public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
return myWorkChunkRepository.fetchChunksForStep(theInstanceId, theStepId).map((entity) -> toChunk(entity, true));
}
/**
* Update the stored instance
*

View File

@ -30,6 +30,7 @@ import org.springframework.data.repository.query.Param;
import java.util.Date;
import java.util.List;
import java.util.stream.Stream;
public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChunkEntity, String>, IHapiFhirJpaRepository {
@ -39,9 +40,16 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
@Query("SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
List<StatusEnum> getDistinctStatusesForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
/**
* Deprecated, use {@link ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository#fetchChunksForStep(String, String)}
*/
@Deprecated
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :targetStepId ORDER BY e.mySequence ASC")
List<Batch2WorkChunkEntity> fetchChunksForStep(Pageable thePageRequest, @Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :targetStepId ORDER BY e.mySequence ASC")
Stream<Batch2WorkChunkEntity> fetchChunksForStep(@Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.myRecordsProcessed = :rp, e.mySerializedData = null WHERE e.myId = :id")
void updateChunkStatusAndClearDataForEndSuccess(@Param("id") String theChunkId, @Param("et") Date theEndTime, @Param("rp") int theRecordsProcessed, @Param("status") StatusEnum theInProgress);

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -3,7 +3,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -49,6 +49,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -667,9 +668,11 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
assertNotNull(startResponse);
// Run a scheduled pass to build the export
myBatch2JobHelper.awaitJobCompletion(startResponse.getJobId());
myBatch2JobHelper.awaitJobCompletion(startResponse.getJobId(), 60);
await().until(() -> myJobRunner.getJobInfo(startResponse.getJobId()).getReport() != null);
await()
.atMost(120, TimeUnit.SECONDS)
.until(() -> myJobRunner.getJobInfo(startResponse.getJobId()).getReport() != null);
// Iterate over the files
String report = myJobRunner.getJobInfo(startResponse.getJobId()).getReport();

View File

@ -6,11 +6,15 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.BulkExportUtils;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.fhir.util.SearchParameterUtil;
@ -43,7 +47,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -51,8 +55,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.awaitility.Awaitility.await;
@ -62,12 +66,12 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
@ -406,6 +410,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
@AfterEach
public void after() {
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.DISABLED);
myDaoConfig.setBulkExportFileMaximumCapacity(DaoConfig.DEFAULT_BULK_EXPORT_FILE_MAXIMUM_CAPACITY);
}
@Test
@ -433,6 +438,57 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
assertThat(typeToContents.get("Observation"), containsString("obs-included"));
assertThat(typeToContents.get("Observation"), not(containsString("obs-excluded")));
}
@Test
public void testBulkExportWithLowMaxFileCapacity() {
final int numPatients = 250;
myDaoConfig.setBulkExportFileMaximumCapacity(1);
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
RequestDetails details = new SystemRequestDetails();
List<String> patientIds = new ArrayList<>();
for(int i = 0; i < numPatients; i++){
String id = "p-"+i;
Patient patient = new Patient();
patient.setId(id);
myPatientDao.update(patient, details);
patientIds.add(id);
}
int patientsCreated = myPatientDao.search(SearchParameterMap.newSynchronous(), details).size();
assertEquals(numPatients, patientsCreated);
BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Sets.newHashSet("Patient"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.PATIENT);
options.setOutputFormat(Constants.CT_FHIR_NDJSON);
Batch2JobStartResponse job = myJobRunner.startNewJob(BulkExportUtils.createBulkExportJobParametersFromExportOptions(options));
myBatch2JobHelper.awaitJobCompletion(job.getJobId(), 60);
ourLog.debug("Job status after awaiting - {}", myJobRunner.getJobInfo(job.getJobId()).getStatus());
await()
.atMost(300, TimeUnit.SECONDS)
.until(() -> {
BulkExportJobStatusEnum status = myJobRunner.getJobInfo(job.getJobId()).getStatus();
if (!BulkExportJobStatusEnum.COMPLETE.equals(status)) {
fail("Job status was changed from COMPLETE to " + status);
}
return myJobRunner.getJobInfo(job.getJobId()).getReport() != null;
});
String report = myJobRunner.getJobInfo(job.getJobId()).getReport();
BulkExportJobResults results = JsonUtil.deserialize(report, BulkExportJobResults.class);
List<String> binaryUrls = results.getResourceTypeToBinaryIds().get("Patient");
IParser jsonParser = myFhirContext.newJsonParser();
for(String url : binaryUrls){
Binary binary = myClient.read().resource(Binary.class).withUrl(url).execute();
assertEquals(Constants.CT_FHIR_NDJSON, binary.getContentType());
String resourceContents = new String(binary.getContent(), Constants.CHARSET_UTF8);
String resourceId = jsonParser.parseResource(resourceContents).getIdElement().getIdPart();
assertTrue(patientIds.contains(resourceId));
}
}
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>hapi-fhir-serviceloaders</artifactId>
<groupId>ca.uhn.hapi.fhir</groupId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>hapi-fhir-serviceloaders</artifactId>
<groupId>ca.uhn.hapi.fhir</groupId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@ -20,7 +20,7 @@
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-caching-api</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>hapi-fhir-serviceloaders</artifactId>
<groupId>ca.uhn.hapi.fhir</groupId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>hapi-fhir</artifactId>
<groupId>ca.uhn.hapi.fhir</groupId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>hapi-fhir</artifactId>
<groupId>ca.uhn.hapi.fhir</groupId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-apache</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-okhttp</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-server-jersey</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -38,6 +38,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
public interface IJobPersistence {
@ -196,14 +197,24 @@ public interface IJobPersistence {
Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData);
/**
* Deprecated, use {@link ca.uhn.fhir.batch2.api.IJobPersistence#fetchAllWorkChunksForStepStream(String, String)}
* Fetch all chunks with data for a given instance for a given step id
* @param theInstanceId
* @param theStepId
* @return - an iterator for fetching work chunks
*/
@Deprecated
Iterator<WorkChunk> fetchAllWorkChunksForStepIterator(String theInstanceId, String theStepId);
/**
* Fetch all chunks with data for a given instance for a given step id
* @param theInstanceId
* @param theStepId
* @return - a stream for fetching work chunks
*/
Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId);
/**
* Update the stored instance. If the status is changing, use {@link ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater}
* instead to ensure state-change callbacks are invoked properly.

View File

@ -39,6 +39,7 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
public abstract class BaseBatch2Config {
@ -56,8 +57,8 @@ public abstract class BaseBatch2Config {
}
@Bean
public WorkChunkProcessor jobStepExecutorService(BatchJobSender theBatchJobSender) {
return new WorkChunkProcessor(myPersistence, theBatchJobSender);
public WorkChunkProcessor jobStepExecutorService(BatchJobSender theBatchJobSender, PlatformTransactionManager theTransactionManager) {
return new WorkChunkProcessor(myPersistence, theBatchJobSender, theTransactionManager);
}
@Bean

View File

@ -0,0 +1,52 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.model.WorkChunk;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
public class ReductionStepChunkProcessingResponse {
private List<String> mySuccessfulChunkIds;
private List<String> myFailedChunksIds;
private boolean myIsSuccessful;
public ReductionStepChunkProcessingResponse(boolean theDefaultSuccessValue){
mySuccessfulChunkIds = new ArrayList<>();
myFailedChunksIds = new ArrayList<>();
myIsSuccessful = theDefaultSuccessValue;
}
public List<String> getSuccessfulChunkIds() {
return mySuccessfulChunkIds;
}
public boolean hasSuccessfulChunksIds(){
return !CollectionUtils.isEmpty(mySuccessfulChunkIds);
}
public void addSuccessfulChunkId(WorkChunk theWorkChunk){
mySuccessfulChunkIds.add(theWorkChunk.getId());
}
public List<String> getFailedChunksIds() {
return myFailedChunksIds;
}
public boolean hasFailedChunkIds(){
return !CollectionUtils.isEmpty(myFailedChunksIds);
}
public void addFailedChunkId(WorkChunk theWorChunk){
myFailedChunksIds.add(theWorChunk.getId());
}
public boolean isSuccessful(){
return myIsSuccessful;
}
public void setSuccessful(boolean theSuccessValue){
myIsSuccessful = theSuccessValue;
}
}

View File

@ -31,17 +31,23 @@ import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import org.slf4j.Logger;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
public class ReductionStepExecutor {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final IJobPersistence myJobPersistence;
private final PlatformTransactionManager myTxManager;
private final TransactionTemplate myTxTemplate;
public ReductionStepExecutor(IJobPersistence theJobPersistence) {
public ReductionStepExecutor(IJobPersistence theJobPersistence, PlatformTransactionManager theTransactionManager) {
myJobPersistence = theJobPersistence;
myTxManager = theTransactionManager;
myTxTemplate = new TransactionTemplate(theTransactionManager);
myTxTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
}
/**
@ -67,86 +73,32 @@ public class ReductionStepExecutor {
}
theInstance.setStatus(StatusEnum.FINALIZE);
// We fetch all chunks first...
Iterator<WorkChunk> chunkIterator = myJobPersistence.fetchAllWorkChunksForStepIterator(theInstance.getInstanceId(), theStep.getStepId());
List<String> failedChunks = new ArrayList<>();
List<String> successfulChunkIds = new ArrayList<>();
boolean retval = true;
boolean defaultSuccessValue = true;
ReductionStepChunkProcessingResponse response = new ReductionStepChunkProcessingResponse(defaultSuccessValue);
try {
while (chunkIterator.hasNext()) {
WorkChunk chunk = chunkIterator.next();
if (!chunk.getStatus().isIncomplete()) {
// This should never happen since jobs with reduction are required to be gated
ourLog.error("Unexpected chunk {} with status {} found while reducing {}. No chunks feeding into a reduction step should be complete.", chunk.getId(), chunk.getStatus(), theInstance);
continue;
myTxTemplate.executeWithoutResult((status) -> {
try(Stream<WorkChunk> chunkIterator2 = myJobPersistence.fetchAllWorkChunksForStepStream(theInstance.getInstanceId(), theStep.getStepId())) {
chunkIterator2.forEach((chunk) -> {
processChunk(chunk, theInstance, theInputType, theParameters, reductionStepWorker, response);
});
}
if (!failedChunks.isEmpty()) {
// we are going to fail all future chunks now
failedChunks.add(chunk.getId());
} else {
try {
// feed them into our reduction worker
// this is the most likely area to throw,
// as this is where db actions and processing is likely to happen
ChunkExecutionDetails<PT, IT> chunkDetails = new ChunkExecutionDetails<>(chunk.getData(theInputType), theParameters, theInstance.getInstanceId(), chunk.getId());
ChunkOutcome outcome = reductionStepWorker.consume(chunkDetails);
switch (outcome.getStatuss()) {
case SUCCESS:
successfulChunkIds.add(chunk.getId());
break;
case ABORT:
ourLog.error("Processing of work chunk {} resulted in aborting job.", chunk.getId());
// fail entire job - including all future workchunks
failedChunks.add(chunk.getId());
retval = false;
break;
case FAIL:
// non-idempotent; but failed chunks will be
// ignored on a second runthrough of reduction step
myJobPersistence.markWorkChunkAsFailed(chunk.getId(),
"Step worker failed to process work chunk " + chunk.getId());
retval = false;
break;
}
} catch (Exception e) {
String msg = String.format(
"Reduction step failed to execute chunk reduction for chunk %s with exception: %s.",
chunk.getId(),
e.getMessage()
);
// we got a failure in a reduction
ourLog.error(msg, e);
retval = false;
myJobPersistence.markWorkChunkAsFailed(chunk.getId(), msg);
}
}
}
});
} finally {
if (!successfulChunkIds.isEmpty()) {
if (response.hasSuccessfulChunksIds()) {
// complete the steps without making a new work chunk
myJobPersistence.markWorkChunksWithStatusAndWipeData(theInstance.getInstanceId(),
successfulChunkIds,
response.getSuccessfulChunkIds(),
StatusEnum.COMPLETED,
null // error message - none
);
}
if (!failedChunks.isEmpty()) {
if (response.hasFailedChunkIds()) {
// mark any failed chunks as failed for aborting
myJobPersistence.markWorkChunksWithStatusAndWipeData(theInstance.getInstanceId(),
failedChunks,
response.getFailedChunksIds(),
StatusEnum.FAILED,
"JOB ABORTED");
}
@ -154,10 +106,72 @@ public class ReductionStepExecutor {
}
// if no successful chunks, return false
if (successfulChunkIds.isEmpty()) {
retval = false;
if (!response.hasSuccessfulChunksIds()) {
response.setSuccessful(false);
}
return retval;
return response.isSuccessful();
}
private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
void processChunk(WorkChunk theChunk,
JobInstance theInstance,
Class<IT> theInputType,
PT theParameters,
IReductionStepWorker<PT, IT, OT> theReductionStepWorker,
ReductionStepChunkProcessingResponse theResponseObject){
if (!theChunk.getStatus().isIncomplete()) {
// This should never happen since jobs with reduction are required to be gated
ourLog.error("Unexpected chunk {} with status {} found while reducing {}. No chunks feeding into a reduction step should be complete.", theChunk.getId(), theChunk.getStatus(), theInstance);
return;
}
if (theResponseObject.hasFailedChunkIds()) {
// we are going to fail all future chunks now
theResponseObject.addFailedChunkId(theChunk);
} else {
try {
// feed them into our reduction worker
// this is the most likely area to throw,
// as this is where db actions and processing is likely to happen
ChunkExecutionDetails<PT, IT> chunkDetails = new ChunkExecutionDetails<>(theChunk.getData(theInputType), theParameters, theInstance.getInstanceId(), theChunk.getId());
ChunkOutcome outcome = theReductionStepWorker.consume(chunkDetails);
switch (outcome.getStatus()) {
case SUCCESS:
theResponseObject.addSuccessfulChunkId(theChunk);
break;
case ABORT:
ourLog.error("Processing of work chunk {} resulted in aborting job.", theChunk.getId());
// fail entire job - including all future workchunks
theResponseObject.addFailedChunkId(theChunk);
theResponseObject.setSuccessful(false);
break;
case FAIL:
// non-idempotent; but failed chunks will be
// ignored on a second runthrough of reduction step
myJobPersistence.markWorkChunkAsFailed(theChunk.getId(),
"Step worker failed to process work chunk " + theChunk.getId());
theResponseObject.setSuccessful(false);
break;
}
} catch (Exception e) {
String msg = String.format(
"Reduction step failed to execute chunk reduction for chunk %s with exception: %s.",
theChunk.getId(),
e.getMessage()
);
// we got a failure in a reduction
ourLog.error(msg, e);
theResponseObject.setSuccessful(false);
myJobPersistence.markWorkChunkAsFailed(theChunk.getId(), msg);
}
}
}
}

View File

@ -36,6 +36,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
public class SynchronizedJobPersistenceWrapper implements IJobPersistence {
@ -153,6 +154,11 @@ public class SynchronizedJobPersistenceWrapper implements IJobPersistence {
return myWrap.fetchAllWorkChunksForStepIterator(theInstanceId, theStepId);
}
@Override
public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
return myWrap.fetchAllWorkChunksForStepStream(theInstanceId, theStepId);
}
@Override
public synchronized boolean updateInstance(JobInstance theInstance) {
return myWrap.updateInstance(theInstance);

View File

@ -36,6 +36,7 @@ import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.springframework.transaction.PlatformTransactionManager;
import javax.annotation.Nullable;
import java.util.Optional;
@ -63,11 +64,12 @@ public class WorkChunkProcessor {
private final ReductionStepExecutor myReductionStepExecutor;
public WorkChunkProcessor(IJobPersistence theJobPersistence,
BatchJobSender theSender) {
BatchJobSender theSender,
PlatformTransactionManager theTransactionManager) {
myJobPersistence = theJobPersistence;
myBatchJobSender = theSender;
myStepExecutor = new StepExecutor(theJobPersistence);
myReductionStepExecutor = new ReductionStepExecutor(theJobPersistence);
myReductionStepExecutor = new ReductionStepExecutor(theJobPersistence, theTransactionManager);
}
/**

View File

@ -33,7 +33,7 @@ public class ChunkOutcome {
myStatus = theStatus;
}
public Status getStatuss() {
public Status getStatus() {
return myStatus;
}

View File

@ -36,6 +36,7 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.transaction.PlatformTransactionManager;
import javax.annotation.Nonnull;
import java.util.Arrays;
@ -43,7 +44,6 @@ import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -69,6 +69,8 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
private JobDefinitionRegistry myJobDefinitionRegistry;
@Mock
private IJobMaintenanceService myJobMaintenanceService;
@Mock
private PlatformTransactionManager myPlatformTransactionManager;
@Captor
private ArgumentCaptor<StepExecutionDetails<TestJobParameters, VoidModel>> myStep1ExecutionDetailsCaptor;
@ -87,7 +89,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
public void beforeEach() {
// The code refactored to keep the same functionality,
// but in this service (so it's a real service here!)
WorkChunkProcessor jobStepExecutorSvc = new WorkChunkProcessor(myJobInstancePersister, myBatchJobSender);
WorkChunkProcessor jobStepExecutorSvc = new WorkChunkProcessor(myJobInstancePersister, myBatchJobSender, myPlatformTransactionManager);
mySvc = new JobCoordinatorImpl(myBatchJobSender, myWorkChannelReceiver, myJobInstancePersister, myJobDefinitionRegistry, jobStepExecutorSvc, myJobMaintenanceService);
}

View File

@ -31,6 +31,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.ArrayList;
import java.util.Arrays;
@ -104,8 +105,8 @@ public class WorkChunkProcessorTest {
// our test class
private class TestWorkChunkProcessor extends WorkChunkProcessor {
public TestWorkChunkProcessor(IJobPersistence thePersistence, BatchJobSender theSender) {
super(thePersistence, theSender);
public TestWorkChunkProcessor(IJobPersistence thePersistence, BatchJobSender theSender, PlatformTransactionManager theTransactionManager) {
super(thePersistence, theSender, theTransactionManager);
}
@Override
@ -138,11 +139,14 @@ public class WorkChunkProcessorTest {
@Mock
private BatchJobSender myJobSender;
@Mock
private PlatformTransactionManager myMockTransactionManager;
private TestWorkChunkProcessor myExecutorSvc;
@BeforeEach
public void init() {
myExecutorSvc = new TestWorkChunkProcessor(myJobPersistence, myJobSender);
myExecutorSvc = new TestWorkChunkProcessor(myJobPersistence, myJobSender, myMockTransactionManager);
}
private <OT extends IModelJson> JobDefinitionStep<TestJobParameters, StepInputData, OT> mockOutWorkCursor(
@ -197,8 +201,8 @@ public class WorkChunkProcessorTest {
// when
when(workCursor.isReductionStep())
.thenReturn(true);
when(myJobPersistence.fetchAllWorkChunksForStepIterator(eq(INSTANCE_ID), eq(REDUCTION_STEP_ID)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunksForStepStream(eq(INSTANCE_ID), eq(REDUCTION_STEP_ID)))
.thenReturn(chunks.stream());
when(myJobPersistence.markInstanceAsStatus(eq(INSTANCE_ID), eq(StatusEnum.FINALIZE))).thenReturn(true);
when(myReductionStep.consume(any(ChunkExecutionDetails.class)))
.thenReturn(ChunkOutcome.SUCCESS());
@ -259,8 +263,8 @@ public class WorkChunkProcessorTest {
// when
when(workCursor.isReductionStep())
.thenReturn(true);
when(myJobPersistence.fetchAllWorkChunksForStepIterator(eq(INSTANCE_ID), eq(REDUCTION_STEP_ID)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunksForStepStream(eq(INSTANCE_ID), eq(REDUCTION_STEP_ID)))
.thenReturn(chunks.stream());
when(myJobPersistence.markInstanceAsStatus(eq(INSTANCE_ID), eq(StatusEnum.FINALIZE))).thenReturn(true);
doThrow(new RuntimeException(errorMsg))
.when(myReductionStep).consume(any(ChunkExecutionDetails.class));
@ -308,8 +312,8 @@ public class WorkChunkProcessorTest {
// when
when(workCursor.isReductionStep())
.thenReturn(true);
when(myJobPersistence.fetchAllWorkChunksForStepIterator(eq(INSTANCE_ID), eq(REDUCTION_STEP_ID)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunksForStepStream(eq(INSTANCE_ID), eq(REDUCTION_STEP_ID)))
.thenReturn(chunks.stream());
when(myJobPersistence.markInstanceAsStatus(eq(INSTANCE_ID), eq(StatusEnum.FINALIZE))).thenReturn(true);
when(myReductionStep.consume(any(ChunkExecutionDetails.class)))
.thenReturn(ChunkOutcome.SUCCESS())
@ -355,8 +359,8 @@ public class WorkChunkProcessorTest {
when(workCursor.isReductionStep())
.thenReturn(true);
when(myJobPersistence.markInstanceAsStatus(eq(INSTANCE_ID), eq(StatusEnum.FINALIZE))).thenReturn(true);
when(myJobPersistence.fetchAllWorkChunksForStepIterator(eq(INSTANCE_ID), eq(REDUCTION_STEP_ID)))
.thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunksForStepStream(eq(INSTANCE_ID), eq(REDUCTION_STEP_ID)))
.thenReturn(chunks.stream());
when(myReductionStep.consume(any(ChunkExecutionDetails.class)))
.thenReturn(ChunkOutcome.SUCCESS())
.thenReturn(new ChunkOutcome(ChunkOutcome.Status.ABORT));
@ -609,7 +613,7 @@ public class WorkChunkProcessorTest {
verify(myJobPersistence, never())
.markWorkChunksWithStatusAndWipeData(anyString(), anyList(), any(), any());
verify(myJobPersistence, never())
.fetchAllWorkChunksForStepIterator(anyString(), anyString());
.fetchAllWorkChunksForStepStream(anyString(), anyString());
}
private JobInstance getTestJobInstance() {

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -97,6 +97,8 @@ public class DaoConfig {
public static final int DEFAULT_BUNDLE_BATCH_POOL_SIZE = 20; // 1 for single thread
public static final int DEFAULT_BUNDLE_BATCH_MAX_POOL_SIZE = 100; // 1 for single thread
public static final int DEFAULT_BUNDLE_BATCH_QUEUE_CAPACITY = 200;
public static final int DEFAULT_BULK_EXPORT_FILE_MAXIMUM_CAPACITY = 1_000;
/**
* Default value for {@link #setMaximumSearchResultCountInTransaction(Integer)}
*
@ -332,7 +334,7 @@ public class DaoConfig {
/**
* Since 6.2.0
*/
private int myBulkExportFileMaximumCapacity = 1_000;
private int myBulkExportFileMaximumCapacity = DEFAULT_BULK_EXPORT_FILE_MAXIMUM_CAPACITY;
/**
* Since 6.4.0
*/

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<packaging>pom</packaging>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<name>HAPI-FHIR</name>
<description>An open-source implementation of the FHIR specification in Java.</description>
<url>https://hapifhir.io</url>
@ -2140,7 +2140,7 @@
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-checkstyle</artifactId>
<!-- Remember to bump this when you upgrade the version -->
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
</dependency>
</dependencies>
</plugin>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.3.14-SNAPSHOT</version>
<version>6.3.15-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>