3662 batch2 reduction step (#3681)

* adding a bunch of stuff

* most of the steps are working now

* writing more tests

* adding more tests

* updated the batch2 coordinators

* updating version

* some minor test tweaks

* added a new test

* adding more tests

* blah

* blah

* review fixes 1

* fixing tests and refactoring

* review points

* updated for code review

* review fixes

* updated

* step 1 on review step 5

* final touches

* blah

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-MacBook-Pro.local>
Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
TipzCM 2022-06-20 14:59:01 -04:00 committed by GitHub
parent 46fdc03151
commit 48c28997e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
112 changed files with 2309 additions and 291 deletions

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -25,7 +25,7 @@ public final class Msg {
/**
* IMPORTANT: Please update the following comment after you add a new code
* Last code value: 2096
* Last code value: 2099
*/
private Msg() {}

View File

@ -0,0 +1,55 @@
package ca.uhn.fhir.model.api;
import ca.uhn.fhir.i18n.Msg;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
public class PagingIterator<T> implements Iterator<T> {
public interface PageFetcher<T> {
void fetchNextPage(int thePageIndex, int theBatchSize, Consumer<T> theConsumer);
}
static final int PAGE_SIZE = 100;
private int myPage;
private boolean myIsFinished;
private final LinkedList<T> myCurrentBatch = new LinkedList<>();
private final PageFetcher<T> myFetcher;
public PagingIterator(PageFetcher<T> theFetcher) {
myFetcher = theFetcher;
}
@Override
public boolean hasNext() {
fetchNextBatch();
return !myCurrentBatch.isEmpty();
}
@Override
public T next() {
fetchNextBatch();
if (myCurrentBatch.isEmpty()) {
throw new NoSuchElementException(Msg.code(2098) + " Nothing to fetch");
}
return myCurrentBatch.remove(0);
}
private void fetchNextBatch() {
if (!myIsFinished && myCurrentBatch.isEmpty()) {
myFetcher.fetchNextPage(myPage, PAGE_SIZE, myCurrentBatch::add);
myPage++;
myIsFinished = myCurrentBatch.size() < PAGE_SIZE;
}
}
}

View File

@ -15,7 +15,6 @@ import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import javax.annotation.Nonnull;
import java.math.BigDecimal;
import java.util.UUID;

View File

@ -24,7 +24,9 @@ import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.*;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.split;
import static org.apache.commons.lang3.StringUtils.trim;
public class ArrayUtil {
@ -42,4 +44,5 @@ public class ArrayUtil {
.collect(Collectors.toSet());
return resourceTypes;
}
}

View File

@ -0,0 +1,90 @@
package ca.uhn.fhir.model.api;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class PagingIteratorTest {
private PagingIterator<String> myPagingIterator;
private List<String> getDataList(int theSize) {
ArrayList<String> data = new ArrayList<>();
for (int i = 0; i < theSize; i++) {
data.add("DataString " + i);
}
return data;
}
private PagingIterator<String> createPagingIterator(int theDataSize) {
List<String> data = getDataList(theDataSize);
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> {
int start = (thePageIndex * theBatchSize);
for (int i = start; i < Math.min(start + theBatchSize, theDataSize); i++) {
theConsumer.accept(data.get(i));
}
});
}
@Test
public void hasNext_returnsTrue_ifElementsAvailable() {
myPagingIterator = createPagingIterator(1);
assertTrue(myPagingIterator.hasNext());
}
@Test
public void hasNext_returnsFalse_ifNoElementsAvialable() {
myPagingIterator = createPagingIterator(0);
assertFalse(myPagingIterator.hasNext());
}
@Test
public void next_whenNextIsAvailable_fetches() {
myPagingIterator = createPagingIterator(10);
String next = myPagingIterator.next();
assertNotNull(next);
assertFalse(next.isEmpty());
}
@Test
public void next_fetchTest_fetchesAndReturns() {
// 3 cases to make sure we get the edge cases
for (int adj : new int[] { -1, 0, 1 }) {
int size = PagingIterator.PAGE_SIZE + adj;
myPagingIterator = createPagingIterator(size);
// test
int count = 0;
while (myPagingIterator.hasNext()) {
myPagingIterator.next();
count++;
}
assertEquals(size, count);
}
}
@Test
public void next_throwsNoSuchElement_whenNoElements() {
myPagingIterator = createPagingIterator(0);
try {
myPagingIterator.next();
fail();
} catch (NoSuchElementException ex) {
assertTrue(ex.getMessage().contains("Nothing to fetch"));
}
}
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -3,14 +3,14 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-bom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<packaging>pom</packaging>
<name>HAPI FHIR BOM</name>
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-cli</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -1,37 +1,30 @@
package ca.uhn.fhir.okhttp.client;
import static ca.uhn.fhir.okhttp.utils.UrlStringUtils.*;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.RequestTypeEnum;
import ca.uhn.fhir.rest.client.api.Header;
import ca.uhn.fhir.rest.client.api.HttpClientUtil;
import ca.uhn.fhir.rest.client.api.IHttpClient;
import ca.uhn.fhir.rest.client.api.IHttpRequest;
import ca.uhn.fhir.rest.client.impl.BaseHttpClientInvocation;
import ca.uhn.fhir.rest.client.method.MethodUtil;
import okhttp3.Call;
import okhttp3.FormBody;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import java.util.List;
import java.util.Map;
import org.hl7.fhir.instance.model.api.IBaseBinary;
/*
* #%L
* HAPI FHIR OkHttp Client
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.api.*;
import ca.uhn.fhir.rest.client.api.*;
import ca.uhn.fhir.rest.client.impl.BaseHttpClientInvocation;
import ca.uhn.fhir.rest.client.method.MethodUtil;
import okhttp3.*;
import static ca.uhn.fhir.okhttp.utils.UrlStringUtils.deleteLastCharacter;
import static ca.uhn.fhir.okhttp.utils.UrlStringUtils.endsWith;
import static ca.uhn.fhir.okhttp.utils.UrlStringUtils.everythingAfterFirstQuestionMark;
import static ca.uhn.fhir.okhttp.utils.UrlStringUtils.hasQuestionMark;
import static ca.uhn.fhir.okhttp.utils.UrlStringUtils.withTrailingQuestionMarkRemoved;
/**
* A Http Request based on OkHttp. This is an adapter around the class

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -0,0 +1,4 @@
---
issue: 3662
type: add
title: "Added support to batch2 jobs to allow for final reduction step for gated jobs."

View File

@ -11,7 +11,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -29,16 +29,21 @@ import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.model.api.PagingIterator;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.Validate;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import javax.annotation.Nonnull;
import javax.transaction.Transactional;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -94,6 +99,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
entity.setParams(theInstance.getParameters());
entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
entity.setCreateTime(new Date());
entity.setReport(theInstance.getReport());
entity = myJobInstanceRepository.save(entity);
return entity.getId();
@ -167,6 +173,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
retVal.setEstimatedTimeRemaining(theEntity.getEstimatedTimeRemaining());
retVal.setParameters(theEntity.getParams());
retVal.setCurrentGatedStepId(theEntity.getCurrentGatedStepId());
retVal.setReport(theEntity.getReport());
return retVal;
}
@ -185,6 +192,14 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(theChunkId, new Date(), theRecordsProcessed, StatusEnum.COMPLETED);
}
@Override
public void markWorkChunksWithStatusAndWipeData(String theInstanceId, List<String> theChunkIds, StatusEnum theStatus, String theErrorMsg) {
List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100);
for (List<String> idList : listOfListOfIds) {
myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError(idList, new Date(), theStatus, theErrorMsg);
}
}
@Override
public void incrementWorkChunkErrorCount(String theChunkId, int theIncrementBy) {
myWorkChunkRepository.incrementWorkChunkErrorCount(theChunkId, theIncrementBy);
@ -192,8 +207,21 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
public List<WorkChunk> fetchWorkChunksWithoutData(String theInstanceId, int thePageSize, int thePageIndex) {
ArrayList<WorkChunk> chunks = new ArrayList<>();
fetchChunks(theInstanceId, false, thePageSize, thePageIndex, chunks::add);
return chunks;
}
private void fetchChunks(String theInstanceId, boolean theIncludeData, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer) {
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunks(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
return chunks.stream().map(t -> toChunk(t, false)).collect(Collectors.toList());
for (Batch2WorkChunkEntity chunk : chunks) {
theConsumer.accept(toChunk(chunk, theIncludeData));
}
}
@Override
public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
}
@Override
@ -214,6 +242,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
instance.setErrorCount(theInstance.getErrorCount());
instance.setEstimatedTimeRemaining(theInstance.getEstimatedTimeRemaining());
instance.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
instance.setReport(theInstance.getReport());
myJobInstanceRepository.save(instance);
}

View File

@ -40,6 +40,10 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.myRecordsProcessed = :rp, e.mySerializedData = null WHERE e.myId = :id")
void updateChunkStatusAndClearDataForEndSuccess(@Param("id") String theChunkId, @Param("et") Date theEndTime, @Param("rp") int theRecordsProcessed, @Param("status") StatusEnum theInProgress);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.mySerializedData = null, e.myErrorMessage = :em WHERE e.myId IN(:ids)")
void updateAllChunksForInstanceStatusClearDataAndSetError(@Param("ids") List<String> theChunkIds, @Param("et") Date theEndTime, @Param("status") StatusEnum theInProgress, @Param("em") String theError);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.myErrorMessage = :em, e.myErrorCount = e.myErrorCount + 1 WHERE e.myId = :id")
void updateChunkStatusAndIncrementErrorCountForEndError(@Param("id") String theChunkId, @Param("et") Date theEndTime, @Param("em") String theErrorMessage, @Param("status") StatusEnum theInProgress);

View File

@ -25,10 +25,12 @@ import ca.uhn.fhir.batch2.model.StatusEnum;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import javax.persistence.Basic;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.FetchType;
import javax.persistence.Id;
import javax.persistence.Index;
import javax.persistence.Lob;
@ -105,6 +107,15 @@ public class Batch2JobInstanceEntity implements Serializable {
@Column(name = "CUR_GATED_STEP_ID", length = ID_MAX_LENGTH, nullable = true)
private String myCurrentGatedStepId;
/**
* Any output from the job can be held in this column
* Even serialized json
*/
@Lob
@Basic(fetch = FetchType.LAZY)
@Column(name = "REPORT", nullable = true, length = Integer.MAX_VALUE - 1)
private String myReport;
public String getCurrentGatedStepId() {
return myCurrentGatedStepId;
}
@ -258,6 +269,14 @@ public class Batch2JobInstanceEntity implements Serializable {
myEstimatedTimeRemaining = left(theEstimatedTimeRemaining, TIME_REMAINING_LENGTH);
}
public String getReport() {
return myReport;
}
public void setReport(String theReport) {
myReport = theReport;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
@ -277,6 +296,7 @@ public class Batch2JobInstanceEntity implements Serializable {
.append("progress", myProgress)
.append("errorMessage", myErrorMessage)
.append("estimatedTimeRemaining", myEstimatedTimeRemaining)
.append("report", myReport)
.toString();
}
}

View File

@ -84,6 +84,18 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
init560(); // 20211027 -
init570(); // 20211102 -
init600(); // 20211102 -
init610();
}
private void init610() {
Builder version = forVersion(VersionEnum.V6_1_0);
// add new REPORT column to BATCH2 tables
version
.onTable("BT2_JOB_INSTANCE")
.addColumn("20220601.1", "REPORT")
.nullable()
.type(ColumnTypeEnum.CLOB);
}
private void init600() {

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -1,26 +1,47 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.ChunkOutcome;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.test.concurrency.PointcutLatch;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class Batch2CoordinatorIT extends BaseJpaR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(Batch2CoordinatorIT.class);
public static final int TEST_JOB_VERSION = 1;
public static final String FIRST_STEP_ID = "first-step";
public static final String LAST_STEP_ID = "last-step";
@ -31,6 +52,9 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
@Autowired
Batch2JobHelper myBatch2JobHelper;
@Autowired
IJobPersistence myJobPersistence;
private final PointcutLatch myFirstStepLatch = new PointcutLatch("First Step");
private final PointcutLatch myLastStepLatch = new PointcutLatch("Last Step");
@ -58,7 +82,6 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myBatch2JobHelper.awaitSingleChunkJobCompletion(instanceId);
}
@Test
public void testFirstStepToSecondStep_singleChunkFasttracks() throws InterruptedException {
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
@ -87,6 +110,104 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myLastStepLatch.awaitExpected();
}
@Test
public void testJobDefinitionWithReductionStepIT() throws InterruptedException {
// setup
String testInfo = "test";
AtomicInteger secondStepInt = new AtomicInteger();
// step 1
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> first = (step, sink) -> {
sink.accept(new FirstStepOutput());
sink.accept(new FirstStepOutput());
callLatch(myFirstStepLatch, step);
return RunOutcome.SUCCESS;
};
// step 2
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> second = (step, sink) -> {
SecondStepOutput output = new SecondStepOutput();
output.setValue(testInfo + secondStepInt.getAndIncrement());
sink.accept(output);
return RunOutcome.SUCCESS;
};
// step 3
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> last = new IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput>() {
private final ArrayList<SecondStepOutput> myOutput = new ArrayList<>();
@Override
public ChunkOutcome consume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails) {
myOutput.add(theChunkDetails.getData());
return ChunkOutcome.SUCCESS();
}
@NotNull
@Override
public RunOutcome run(@NotNull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
@NotNull IJobDataSink<ReductionStepOutput> theDataSink) throws JobExecutionFailedException {
theDataSink.accept(new ReductionStepOutput(myOutput));
callLatch(myLastStepLatch, theStepExecutionDetails);
return RunOutcome.SUCCESS;
}
};
// create job definition
String jobId = new Exception().getStackTrace()[0].getMethodName();
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
.setJobDefinitionId(jobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
first
)
.addIntermediateStep("SECOND",
"Second step",
SecondStepOutput.class,
second)
.addFinalReducerStep(
LAST_STEP_ID,
"Test last step",
ReductionStepOutput.class,
last
)
.build();
myJobDefinitionRegistry.addJobDefinition(jd);
// run test
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
String instanceId = myJobCoordinator.startInstance(request);
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
// wait for last step to finish
myLastStepLatch.setExpectedCount(1);
myBatch2JobHelper.awaitMultipleChunkJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
// verify
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);
assertTrue(instanceOp.isPresent());
int secondStepCalls = secondStepInt.get();
assertEquals(2, secondStepCalls);
JobInstance instance = instanceOp.get();
ourLog.info(JsonUtil.serialize(instance, true));
assertNotNull(instance.getReport());
for (int i = 0; i < secondStepInt.get(); i++) {
assertTrue(instance.getReport().contains(
testInfo + i
));
}
}
@Test
public void testFirstStepToSecondStep_doubleChunk_doesNotFastTrack() throws InterruptedException {
@ -210,4 +331,25 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
FirstStepOutput() {
}
}
static class SecondStepOutput implements IModelJson {
@JsonProperty("test")
private String myTestValue;
SecondStepOutput() {
}
public void setValue(String theV) {
myTestValue = theV;
}
}
static class ReductionStepOutput implements IModelJson {
@JsonProperty("result")
private List<?> myResult;
ReductionStepOutput(List<?> theResult) {
myResult = theResult;
}
}
}

View File

@ -20,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
@ -115,6 +116,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(JOB_DEF_VER, foundInstance.getJobDefinitionVersion());
assertEquals(StatusEnum.IN_PROGRESS, foundInstance.getStatus());
assertEquals(CHUNK_DATA, foundInstance.getParameters());
assertEquals(instance.getReport(), foundInstance.getReport());
runInTransaction(() -> {
Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(() -> new IllegalStateException());
@ -264,8 +266,6 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
}
@Test
@ -367,8 +367,6 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime());
assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime());
});
}
@Test
@ -414,9 +412,39 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(0.5d, finalInstance.getProgress());
assertTrue(finalInstance.isWorkChunksPurged());
assertEquals(3, finalInstance.getErrorCount());
assertEquals(instance.getReport(), finalInstance.getReport());
assertEquals(instance.getEstimatedTimeRemaining(), finalInstance.getEstimatedTimeRemaining());
}
@Test
public void markWorkChunksWithStatusAndWipeData_marksMultipleChunksWithStatus_asExpected() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
ArrayList<String> chunkIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
BatchWorkChunk chunk = new BatchWorkChunk(
"defId",
1,
"stepId",
instanceId,
0,
"{}"
);
String id = mySvc.storeWorkChunk(chunk);
chunkIds.add(id);
}
mySvc.markWorkChunksWithStatusAndWipeData(instance.getInstanceId(), chunkIds, StatusEnum.COMPLETED, null);
Iterator<WorkChunk> reducedChunks = mySvc.fetchAllWorkChunksIterator(instanceId, true);
while (reducedChunks.hasNext()) {
WorkChunk reducedChunk = reducedChunks.next();
assertTrue(chunkIds.contains(reducedChunk.getId()));
assertEquals(StatusEnum.COMPLETED, reducedChunk.getStatus());
}
}
@Nonnull
private JobInstance createInstance() {
JobInstance instance = new JobInstance();
@ -424,6 +452,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
instance.setStatus(StatusEnum.QUEUED);
instance.setJobDefinitionVersion(JOB_DEF_VER);
instance.setParameters(CHUNK_DATA);
instance.setReport("TEST");
return instance;
}

View File

@ -2,22 +2,16 @@ package ca.uhn.fhir.jpa.provider.dstu3;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.test.BaseJpaDstu3Test;
import ca.uhn.fhir.jpa.rp.dstu3.ObservationResourceProvider;
import ca.uhn.fhir.jpa.rp.dstu3.OrganizationResourceProvider;
import ca.uhn.fhir.jpa.rp.dstu3.PatientResourceProvider;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.jpa.test.BaseJpaDstu3Test;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor;
import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.test.utilities.JettyUtil;
import com.google.common.base.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
@ -37,7 +31,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -46,7 +39,6 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -13,4 +13,4 @@
<appender-ref ref="STDOUT" />
</root>
</configuration>
</configuration>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-apache</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-okhttp</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-server-jersey</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -0,0 +1,39 @@
package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.model.api.IModelJson;
public class ChunkExecutionDetails<PT extends IModelJson, IT extends IModelJson> {
private final IT myData;
private final PT myParameters;
private final String myInstanceId;
private final String myChunkId;
public ChunkExecutionDetails(IT theData,
PT theParameters,
String theInstanceId,
String theChunkId) {
myData = theData;
myParameters = theParameters;
myInstanceId = theInstanceId;
myChunkId = theChunkId;
}
public IT getData() {
return myData;
}
public PT getParameters() {
return myParameters;
}
public String getInstanceId() {
return myInstanceId;
}
public String getChunkId() {
return myChunkId;
}
}

View File

@ -22,8 +22,10 @@ package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@ -103,6 +105,15 @@ public interface IJobPersistence {
*/
void markWorkChunkAsCompletedAndClearData(String theChunkId, int theRecordsProcessed);
/**
* Marks all work chunks with the provided status and erases the data
* @param theInstanceId - the instance id
* @param theChunkIds - the ids of work chunks being reduced to single chunk
* @param theStatus - the status to mark
* @param theErrorMsg - error message (if status warrants it)
*/
void markWorkChunksWithStatusAndWipeData(String theInstanceId, List<String> theChunkIds, StatusEnum theStatus, String theErrorMsg);
/**
* Increments the work chunk error count by the given amount
*
@ -120,6 +131,14 @@ public interface IJobPersistence {
*/
List<WorkChunk> fetchWorkChunksWithoutData(String theInstanceId, int thePageSize, int thePageIndex);
/**
* Fetch all chunks for a given instance.
* @param theInstanceId - instance id
* @param theWithData - whether or not to include the data
* @return - an iterator for fetching work chunks
*/
Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData);
/**
* Update the stored instance
*

View File

@ -49,6 +49,4 @@ public interface IJobStepWorker<PT extends IModelJson, IT extends IModelJson, OT
*/
@Nonnull
RunOutcome run(@Nonnull StepExecutionDetails<PT, IT> theStepExecutionDetails, @Nonnull IJobDataSink<OT> theDataSink) throws JobExecutionFailedException;
}

View File

@ -0,0 +1,46 @@
package ca.uhn.fhir.batch2.api;
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.batch2.model.ChunkOutcome;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.model.api.IModelJson;
import javax.annotation.Nonnull;
/**
* Reduction step worker.
* @param <PT> Job Parameter Type
* @param <IT> Input Parameter type (real input for step is ListResult of IT
* @param <OT> Output Job Report Type
*/
public interface IReductionStepWorker<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
extends IJobStepWorker<PT, IT, OT> {
/**
*
* If an exception is thrown, the workchunk will be marked as failed.
* @param theChunkDetails - the workchunk details for reduction
* @return
*/
@Nonnull
ChunkOutcome consume(ChunkExecutionDetails<PT, IT> theChunkDetails);
}

View File

@ -0,0 +1,56 @@
package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.batch2.model.ListResult;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import org.apache.commons.lang3.Validate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
/**
* This class is used for Reduction Step for Batch2 Jobs.
* @param <PT> - Job Parameters type
* @param <IT> - Input data type
* @param <OT> - Output data type. Output will actually be a ListResult of these objects.
*/
public class ReductionStepExecutionDetails<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
extends StepExecutionDetails<PT, IT> {
public ReductionStepExecutionDetails(@Nonnull PT theParameters,
@Nullable IT theData,
@Nonnull String theInstanceId) {
super(theParameters, theData, theInstanceId, "VOID");
}
@Override
@Nonnull
public final IT getData() {
throw new UnsupportedOperationException(Msg.code(2099) + " Reduction steps should have all data by the time execution is called.");
}
@Override
public boolean hasAssociatedWorkChunk() {
return false;
}
}

View File

@ -33,7 +33,10 @@ public class StepExecutionDetails<PT extends IModelJson, IT extends IModelJson>
private final String myInstanceId;
private final String myChunkId;
public StepExecutionDetails(@Nonnull PT theParameters, @Nullable IT theData, @Nonnull String theInstanceId, @Nonnull String theChunkId) {
public StepExecutionDetails(@Nonnull PT theParameters,
@Nullable IT theData,
@Nonnull String theInstanceId,
@Nonnull String theChunkId) {
Validate.notNull(theParameters);
myParameters = theParameters;
myData = theData;
@ -42,9 +45,10 @@ public class StepExecutionDetails<PT extends IModelJson, IT extends IModelJson>
}
/**
* Returns the data associated with this step execution. This method should never be
* called during the first step of a job, and will never return <code>null</code> during
* any subsequent steps.
* Returns the data associated with this step execution.
* This method should never be called during the first step of a job,
* or in a reduction step, and will never return <code>null</code> during
* any other steps.
*
* @throws NullPointerException If this method is called during the first step of a job
*/
@ -79,4 +83,13 @@ public class StepExecutionDetails<PT extends IModelJson, IT extends IModelJson>
return myChunkId;
}
/**
* Returns true if there's a workchunk to store data to.
* If false, failures and data storage go straight to the jobinstance instead
* @return - true if there's a workchunk in the db to store to.
* false if the output goes to the jobinstance instead
*/
public boolean hasAssociatedWorkChunk() {
return true;
}
}

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.StepExecutionSvc;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
@ -34,6 +35,7 @@ import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -42,29 +44,51 @@ public abstract class BaseBatch2Config {
public static final String CHANNEL_NAME = "batch2-work-notification";
@Autowired
private IJobPersistence myPersistence;
@Autowired
private IChannelFactory myChannelFactory;
@Bean
public JobDefinitionRegistry batch2JobDefinitionRegistry() {
return new JobDefinitionRegistry();
}
@Bean
public BatchJobSender batchJobSender(IChannelFactory theChannelFactory) {
return new BatchJobSender(batch2ProcessingChannelProducer(theChannelFactory));
public StepExecutionSvc jobStepExecutorService(BatchJobSender theBatchJobSender) {
return new StepExecutionSvc(myPersistence, theBatchJobSender);
}
@Bean
public IJobCoordinator batch2JobCoordinator(IChannelFactory theChannelFactory, IJobPersistence theJobInstancePersister, JobDefinitionRegistry theJobDefinitionRegistry, BatchJobSender theBatchJobSender) {
public BatchJobSender batchJobSender() {
return new BatchJobSender(batch2ProcessingChannelProducer(myChannelFactory));
}
@Bean
public IJobCoordinator batch2JobCoordinator(JobDefinitionRegistry theJobDefinitionRegistry,
BatchJobSender theBatchJobSender,
StepExecutionSvc theExecutor) {
return new JobCoordinatorImpl(
theBatchJobSender,
batch2ProcessingChannelReceiver(theChannelFactory),
theJobInstancePersister,
theJobDefinitionRegistry
batch2ProcessingChannelReceiver(myChannelFactory),
myPersistence,
theJobDefinitionRegistry,
theExecutor
);
}
@Bean
public IJobMaintenanceService batch2JobMaintenanceService(ISchedulerService theSchedulerService, IJobPersistence theJobPersistence, JobDefinitionRegistry theJobDefinitionRegistry, BatchJobSender theBatchJobSender) {
return new JobMaintenanceServiceImpl(theSchedulerService, theJobPersistence, theJobDefinitionRegistry, theBatchJobSender);
public IJobMaintenanceService batch2JobMaintenanceService(ISchedulerService theSchedulerService,
JobDefinitionRegistry theJobDefinitionRegistry,
BatchJobSender theBatchJobSender,
StepExecutionSvc theExecutor
) {
return new JobMaintenanceServiceImpl(theSchedulerService,
myPersistence,
theJobDefinitionRegistry,
theBatchJobSender,
theExecutor
);
}
@Bean

View File

@ -33,10 +33,13 @@ abstract class BaseDataSink<PT extends IModelJson, IT extends IModelJson, OT ext
private final String myInstanceId;
private final JobWorkCursor<PT,IT,OT> myJobWorkCursor;
private int myRecoveredErrorCount;
protected final String myJobDefinitionId;
protected BaseDataSink(String theInstanceId, JobWorkCursor<PT,IT,OT> theJobWorkCursor) {
protected BaseDataSink(String theInstanceId,
JobWorkCursor<PT,IT,OT> theJobWorkCursor) {
myInstanceId = theInstanceId;
myJobWorkCursor = theJobWorkCursor;
myJobDefinitionId = theJobWorkCursor.getJobDefinition().getJobDefinitionId();
}
public String getInstanceId() {
@ -62,4 +65,8 @@ abstract class BaseDataSink<PT extends IModelJson, IT extends IModelJson, OT ext
public JobDefinitionStep<PT,IT,OT> getTargetStep() {
return myJobWorkCursor.currentStep;
}
public String getJobDefinitionId() {
return myJobDefinitionId;
}
}

View File

@ -34,14 +34,11 @@ import javax.annotation.Nonnull;
class FinalStepDataSink<PT extends IModelJson, IT extends IModelJson> extends BaseDataSink<PT,IT,VoidModel> {
private static final Logger ourLog = LoggerFactory.getLogger(FinalStepDataSink.class);
private final String myJobDefinitionId;
/**
* Constructor
*/
FinalStepDataSink(@Nonnull String theJobDefinitionId, @Nonnull String theInstanceId, @Nonnull JobWorkCursor<PT,IT,VoidModel> theJobWorkCursor) {
super(theInstanceId, theJobWorkCursor);
myJobDefinitionId = theJobDefinitionId;
}
@Override

View File

@ -55,7 +55,12 @@ public class JobCoordinatorImpl implements IJobCoordinator {
/**
* Constructor
*/
public JobCoordinatorImpl(@Nonnull BatchJobSender theBatchJobSender, @Nonnull IChannelReceiver theWorkChannelReceiver, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry) {
public JobCoordinatorImpl(@Nonnull BatchJobSender theBatchJobSender,
@Nonnull IChannelReceiver theWorkChannelReceiver,
@Nonnull IJobPersistence theJobPersistence,
@Nonnull JobDefinitionRegistry theJobDefinitionRegistry,
@Nonnull StepExecutionSvc theExecutorSvc
) {
Validate.notNull(theJobPersistence);
myJobPersistence = theJobPersistence;
@ -63,7 +68,7 @@ public class JobCoordinatorImpl implements IJobCoordinator {
myWorkChannelReceiver = theWorkChannelReceiver;
myJobDefinitionRegistry = theJobDefinitionRegistry;
myReceiverHandler = new WorkChannelMessageHandler(theJobPersistence, theJobDefinitionRegistry, theBatchJobSender);
myReceiverHandler = new WorkChannelMessageHandler(theJobPersistence, theJobDefinitionRegistry, theBatchJobSender, theExecutorSvc);
myJobQuerySvc = new JobQuerySvc(theJobPersistence, theJobDefinitionRegistry);
myJobParameterJsonValidator = new JobParameterJsonValidator();
}

View File

@ -22,7 +22,11 @@ package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.*;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
@ -41,7 +45,11 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
private final AtomicReference<String> myLastChunkId = new AtomicReference<>();
private final boolean myGatedExecution;
JobDataSink(@Nonnull BatchJobSender theBatchJobSender, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinition<?> theDefinition, @Nonnull String theInstanceId, @Nonnull JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
JobDataSink(@Nonnull BatchJobSender theBatchJobSender,
@Nonnull IJobPersistence theJobPersistence,
@Nonnull JobDefinition<?> theDefinition,
@Nonnull String theInstanceId,
@Nonnull JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
super(theInstanceId, theJobWorkCursor);
myBatchJobSender = theBatchJobSender;
myJobPersistence = theJobPersistence;
@ -55,6 +63,7 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
public void accept(WorkChunkData<OT> theData) {
String instanceId = getInstanceId();
String targetStepId = myTargetStep.getStepId();
int sequence = myChunkCounter.getAndIncrement();
OT dataValue = theData.getData();
String dataValueString = JsonUtil.serialize(dataValue, false);

View File

@ -21,77 +21,71 @@ package ca.uhn.fhir.batch2.coordinator;
*/
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.JobStepFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.model.api.IModelJson;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.Optional;
import static ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor.updateInstanceStatus;
public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> {
private static final Logger ourLog = LoggerFactory.getLogger(JobStepExecutor.class);
private final IJobPersistence myJobPersistence;
private final BatchJobSender myBatchJobSender;
private final StepExecutionSvc myJobExecutorSvc;
private final JobDefinition<PT> myDefinition;
private final JobInstance myInstance;
private final String myInstanceId;
private final WorkChunk myWorkChunk;
private final JobWorkCursor<PT, IT, OT> myCursor;
private final PT myParameters;
JobStepExecutor(@Nonnull IJobPersistence theJobPersistence, @Nonnull BatchJobSender theBatchJobSender, @Nonnull JobInstance theInstance, @Nonnull WorkChunk theWorkChunk, @Nonnull JobWorkCursor<PT, IT, OT> theCursor) {
JobStepExecutor(@Nonnull IJobPersistence theJobPersistence,
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull JobInstance theInstance,
@Nonnull WorkChunk theWorkChunk,
@Nonnull JobWorkCursor<PT, IT, OT> theCursor,
@Nonnull StepExecutionSvc theExecutor) {
myJobPersistence = theJobPersistence;
myBatchJobSender = theBatchJobSender;
myDefinition = theCursor.jobDefinition;
myInstance = theInstance;
myInstanceId = theInstance.getInstanceId();
myParameters = theInstance.getParameters(myDefinition.getParametersType());
myWorkChunk = theWorkChunk;
myCursor = theCursor;
myJobExecutorSvc = theExecutor;
}
@SuppressWarnings("unchecked")
void executeStep() {
BaseDataSink<PT,IT,OT> dataSink;
if (myCursor.isFinalStep()) {
dataSink = (BaseDataSink<PT, IT, OT>) new FinalStepDataSink<>(myDefinition.getJobDefinitionId(), myInstanceId, myCursor.asFinalCursor());
} else {
dataSink = new JobDataSink<>(myBatchJobSender, myJobPersistence, myDefinition, myInstanceId, myCursor);
}
JobStepExecutorOutput<PT, IT, OT> stepExecutorOutput = myJobExecutorSvc.doExecution(
myCursor,
myInstance,
myWorkChunk
);
boolean success = executeStep(myDefinition.getJobDefinitionId(), myWorkChunk, myParameters, dataSink);
if (!success) {
if (!stepExecutorOutput.isSuccessful()) {
return;
}
if (dataSink.firstStepProducedNothing()) {
if (stepExecutorOutput.getDataSink().firstStepProducedNothing()) {
ourLog.info("First step of job myInstance {} produced no work chunks, marking as completed", myInstanceId);
myJobPersistence.markInstanceAsCompleted(myInstanceId);
}
if (myDefinition.isGatedExecution()) {
handleGatedExecution(dataSink);
handleGatedExecution(stepExecutorOutput.getDataSink());
}
}
@ -141,48 +135,4 @@ public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT ex
myJobPersistence.updateInstance(jobInstance);
return jobInstance;
}
private boolean executeStep(String theJobDefinitionId, @Nonnull WorkChunk theWorkChunk, PT theParameters, BaseDataSink<PT,IT,OT> theDataSink) {
JobDefinitionStep<PT, IT, OT> theTargetStep = theDataSink.getTargetStep();
String targetStepId = theTargetStep.getStepId();
Class<IT> inputType = theTargetStep.getInputType();
IJobStepWorker<PT, IT, OT> worker = theTargetStep.getJobStepWorker();
IT inputData = null;
if (!inputType.equals(VoidModel.class)) {
inputData = theWorkChunk.getData(inputType);
}
String instanceId = theWorkChunk.getInstanceId();
String chunkId = theWorkChunk.getId();
StepExecutionDetails<PT, IT> stepExecutionDetails = new StepExecutionDetails<>(theParameters, inputData, instanceId, chunkId);
RunOutcome outcome;
try {
outcome = worker.run(stepExecutionDetails, theDataSink);
Validate.notNull(outcome, "Step theWorker returned null: %s", worker.getClass());
} catch (JobExecutionFailedException e) {
ourLog.error("Unrecoverable failure executing job {} step {}", theJobDefinitionId, targetStepId, e);
myJobPersistence.markWorkChunkAsFailed(chunkId, e.toString());
return false;
} catch (Exception e) {
ourLog.error("Failure executing job {} step {}", theJobDefinitionId, targetStepId, e);
myJobPersistence.markWorkChunkAsErroredAndIncrementErrorCount(chunkId, e.toString());
throw new JobStepFailedException(Msg.code(2041) + e.getMessage(), e);
} catch (Throwable t) {
ourLog.error("Unexpected failure executing job {} step {}", theJobDefinitionId, targetStepId, t);
myJobPersistence.markWorkChunkAsFailed(chunkId, t.toString());
return false;
}
int recordsProcessed = outcome.getRecordsProcessed();
myJobPersistence.markWorkChunkAsCompletedAndClearData(chunkId, recordsProcessed);
int recoveredErrorCount = theDataSink.getRecoveredErrorCount();
if (recoveredErrorCount > 0) {
myJobPersistence.incrementWorkChunkErrorCount(chunkId, recoveredErrorCount);
}
return true;
}
}

View File

@ -32,13 +32,17 @@ import javax.annotation.Nonnull;
public class JobStepExecutorFactory {
private final IJobPersistence myJobPersistence;
private final BatchJobSender myBatchJobSender;
private final StepExecutionSvc myJobStepExecutorSvc;
public JobStepExecutorFactory(@Nonnull IJobPersistence theJobPersistence, @Nonnull BatchJobSender theBatchJobSender) {
public JobStepExecutorFactory(@Nonnull IJobPersistence theJobPersistence,
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull StepExecutionSvc theExecutorSvc) {
myJobPersistence = theJobPersistence;
myBatchJobSender = theBatchJobSender;
myJobStepExecutorSvc = theExecutorSvc;
}
public <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> JobStepExecutor<PT,IT,OT> newJobStepExecutor(@Nonnull JobInstance theInstance, @Nonnull WorkChunk theWorkChunk, @Nonnull JobWorkCursor<PT, IT, OT> theCursor) {
return new JobStepExecutor<>(myJobPersistence, myBatchJobSender, theInstance, theWorkChunk, theCursor);
return new JobStepExecutor<>(myJobPersistence, myBatchJobSender, theInstance, theWorkChunk, theCursor, myJobStepExecutorSvc);
}
}

View File

@ -0,0 +1,22 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.model.api.IModelJson;
public class JobStepExecutorOutput<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> {
private final boolean mySuccess;
private final BaseDataSink<PT, IT, OT> myDataSink;
public JobStepExecutorOutput(boolean theIsSuccessful, BaseDataSink<PT, IT, OT> theDataSink) {
mySuccess = theIsSuccessful;
myDataSink = theDataSink;
}
public boolean isSuccessful() {
return mySuccess;
}
public BaseDataSink<PT, IT, OT> getDataSink() {
return myDataSink;
}
}

View File

@ -0,0 +1,61 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
public class ReductionStepDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
extends BaseDataSink<PT, IT, OT> {
private static final Logger ourLog = LoggerFactory.getLogger(ReductionStepDataSink.class);
private final IJobPersistence myJobPersistence;
protected ReductionStepDataSink(String theInstanceId,
JobWorkCursor<PT, IT, OT> theJobWorkCursor,
JobDefinition<PT> theDefinition,
IJobPersistence thePersistence) {
super(theInstanceId, theJobWorkCursor);
myJobPersistence = thePersistence;
}
@Override
public void accept(WorkChunkData<OT> theData) {
String instanceId = getInstanceId();
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);
if (instanceOp.isPresent()) {
JobInstance instance = instanceOp.get();
if (instance.getReport() != null) {
// last in wins - so we won't throw
ourLog.error(
"Report has already been set. Now it is being overwritten. Last in will win!");
}
OT data = theData.getData();
String dataString = JsonUtil.serialize(data, false);
instance.setReport(dataString);
ourLog.debug(JsonUtil.serialize(instance));
myJobPersistence.updateInstance(instance);
} else {
String msg = "No instance found with Id " + instanceId;
ourLog.error(msg);
throw new JobExecutionFailedException(Msg.code(2097) + msg);
}
}
@Override
public int getWorkChunkCount() {
return 0;
}
}

View File

@ -0,0 +1,310 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.JobStepFailedException;
import ca.uhn.fhir.batch2.api.ReductionStepExecutionDetails;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.ChunkOutcome;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
public class StepExecutionSvc {
private static final Logger ourLog = LoggerFactory.getLogger(StepExecutionSvc.class);
private final IJobPersistence myJobPersistence;
private final BatchJobSender myBatchJobSender;
public StepExecutionSvc(IJobPersistence thePersistence,
BatchJobSender theSender) {
myJobPersistence = thePersistence;
myBatchJobSender = theSender;
}
/**
* Execute the work chunk.
*
* @param theCursor - work cursor
* @param theInstance - the job instance
* @param theWorkChunk - the work chunk (if available); can be null (for reduction step only!)
* @param <PT> - Job parameters Type
* @param <IT> - Step input parameters Type
* @param <OT> - Step output parameters Type
* @return - JobStepExecution output. Contains the datasink and whether or not the execution had succeeded.
*/
public <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> JobStepExecutorOutput<PT, IT, OT>
doExecution(
JobWorkCursor<PT, IT, OT> theCursor,
JobInstance theInstance,
@Nullable WorkChunk theWorkChunk
) {
JobDefinitionStep<PT, IT, OT> step = theCursor.getCurrentStep();
JobDefinition<PT> jobDefinition = theCursor.getJobDefinition();
String instanceId = theInstance.getInstanceId();
Class<IT> inputType = step.getInputType();
PT parameters = theInstance.getParameters(jobDefinition.getParametersType());
IJobStepWorker<PT, IT, OT> worker = step.getJobStepWorker();
BaseDataSink<PT, IT, OT> dataSink = getDataSink(theCursor, jobDefinition, instanceId);
StepExecutionDetails<PT, IT> stepExecutionDetails;
if (step.isReductionStep()) {
// reduction step details
boolean success = executeReductionStep(theInstance,
step,
inputType,
parameters,
dataSink);
return new JobStepExecutorOutput<>(success, dataSink);
} else {
// all other kinds of steps
Validate.notNull(theWorkChunk);
stepExecutionDetails = getExecutionDetailsForNonReductionStep(theWorkChunk, instanceId, inputType, parameters);
// execute the step
boolean success = executeStep(stepExecutionDetails,
worker,
dataSink);
// return results with data sink
return new JobStepExecutorOutput<>(success, dataSink);
}
}
/**
* Get the correct datasink for the cursor/job provided.
*/
@SuppressWarnings("unchecked")
protected <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> BaseDataSink<PT, IT, OT> getDataSink(
JobWorkCursor<PT, IT, OT> theCursor,
JobDefinition<PT> theJobDefinition,
String theInstanceId
) {
BaseDataSink<PT, IT, OT> dataSink;
if (theCursor.isReductionStep()) {
dataSink = new ReductionStepDataSink<>(
theInstanceId,
theCursor,
theJobDefinition,
myJobPersistence
);
}
else if (theCursor.isFinalStep()) {
dataSink = (BaseDataSink<PT, IT, OT>) new FinalStepDataSink<>(theJobDefinition.getJobDefinitionId(), theInstanceId, theCursor.asFinalCursor());
} else {
dataSink = new JobDataSink<>(myBatchJobSender, myJobPersistence, theJobDefinition, theInstanceId, theCursor);
}
return dataSink;
}
/**
* Construct execution details for non-reduction step
*/
private <PT extends IModelJson, IT extends IModelJson> StepExecutionDetails<PT, IT> getExecutionDetailsForNonReductionStep(
WorkChunk theWorkChunk,
String theInstanceId,
Class<IT> theInputType,
PT theParameters
) {
StepExecutionDetails<PT, IT> stepExecutionDetails;
IT inputData = null;
if (!theInputType.equals(VoidModel.class)) {
inputData = theWorkChunk.getData(theInputType);
}
String chunkId = theWorkChunk.getId();
stepExecutionDetails = new StepExecutionDetails<>(
theParameters,
inputData,
theInstanceId,
chunkId
);
return stepExecutionDetails;
}
/**
* Do work and construct execution details for job reduction step
*/
private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> boolean executeReductionStep(
JobInstance theInstance,
JobDefinitionStep<PT, IT, OT> theStep,
Class<IT> theInputType,
PT theParameters,
BaseDataSink<PT, IT, OT> theDataSink
) {
IReductionStepWorker<PT, IT, OT> theReductionWorker = (IReductionStepWorker<PT, IT, OT>) theStep.getJobStepWorker();
// We fetch all chunks first...
Iterator<WorkChunk> chunkIterator = myJobPersistence.fetchAllWorkChunksIterator(theInstance.getInstanceId(), true);
List<String> failedChunks = new ArrayList<>();
List<String> successfulChunkIds = new ArrayList<>();
boolean jobFailed = false;
while (chunkIterator.hasNext()) {
WorkChunk chunk = chunkIterator.next();
if (chunk.getStatus() != StatusEnum.QUEUED) {
// we are currently fetching all statuses from the db
// we will ignore non-completed steps.
// should we throw for errored values we find here?
continue;
}
if (!failedChunks.isEmpty()) {
// we are going to fail all future chunks now
failedChunks.add(chunk.getId());
} else {
try {
// feed them into our reduction worker
// this is the most likely area to throw,
// as this is where db actions and processing is likely to happen
ChunkOutcome outcome = theReductionWorker.consume(new ChunkExecutionDetails<>(
chunk.getData(theInputType),
theParameters,
theInstance.getInstanceId(),
chunk.getId()
));
switch (outcome.getStatuss()) {
case SUCCESS:
successfulChunkIds.add(chunk.getId());
break;
case ABORT:
ourLog.error("Processing of work chunk {} resulted in aborting job.", chunk.getId());
// fail entire job - including all future workchunks
failedChunks.add(chunk.getId());
jobFailed = true;
break;
case FAIL:
myJobPersistence.markWorkChunkAsFailed(chunk.getId(),
"Step worker failed to process work chunk " + chunk.getId());
jobFailed = true;
break;
}
} catch (Exception e) {
String msg = String.format(
"Reduction step failed to execute chunk reduction for chunk %s with exception: %s.",
chunk.getId(),
e.getMessage()
);
// we got a failure in a reduction
ourLog.error(msg);
jobFailed = true;
myJobPersistence.markWorkChunkAsFailed(chunk.getId(), msg);
}
}
}
if (!successfulChunkIds.isEmpty()) {
// complete the steps without making a new work chunk
myJobPersistence.markWorkChunksWithStatusAndWipeData(theInstance.getInstanceId(),
successfulChunkIds,
StatusEnum.COMPLETED,
null // error message - none
);
}
if (!failedChunks.isEmpty()) {
// mark any failed chunks as failed for aborting
myJobPersistence.markWorkChunksWithStatusAndWipeData(theInstance.getInstanceId(),
failedChunks,
StatusEnum.FAILED,
"JOB ABORTED");
}
// if no successful chunks, return false
if (successfulChunkIds.isEmpty()) {
return false;
}
// we'll call execute (to reuse existing architecture)
// the data sink will do the storing to the instance (and not the chunks).
// it is assumed the OT (report) data is smaller than the list of all IT data
ReductionStepExecutionDetails<PT, IT, OT> executionDetails = new ReductionStepExecutionDetails<>(
theParameters,
null,
theInstance.getInstanceId()
);
return executeStep(executionDetails,
theReductionWorker,
theDataSink) && !jobFailed;
}
/**
* Calls the worker execution step, and performs error handling logic for jobs that failed.
*/
private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> boolean executeStep(
StepExecutionDetails<PT, IT> theStepExecutionDetails,
IJobStepWorker<PT, IT, OT> theStepWorker,
BaseDataSink<PT, IT, OT> theDataSink
) {
String jobDefinitionId = theDataSink.getJobDefinitionId();
String targetStepId = theDataSink.getTargetStep().getStepId();
String chunkId = theStepExecutionDetails.getChunkId();
RunOutcome outcome;
try {
outcome = theStepWorker.run(theStepExecutionDetails, theDataSink);
Validate.notNull(outcome, "Step theWorker returned null: %s", theStepWorker.getClass());
} catch (JobExecutionFailedException e) {
ourLog.error("Unrecoverable failure executing job {} step {}",
jobDefinitionId,
targetStepId,
e);
if (theStepExecutionDetails.hasAssociatedWorkChunk()) {
myJobPersistence.markWorkChunkAsFailed(chunkId, e.toString());
}
return false;
} catch (Exception e) {
ourLog.error("Failure executing job {} step {}", jobDefinitionId, targetStepId, e);
if (theStepExecutionDetails.hasAssociatedWorkChunk()) {
myJobPersistence.markWorkChunkAsErroredAndIncrementErrorCount(chunkId, e.toString());
}
throw new JobStepFailedException(Msg.code(2041) + e.getMessage(), e);
} catch (Throwable t) {
ourLog.error("Unexpected failure executing job {} step {}", jobDefinitionId, targetStepId, t);
if (theStepExecutionDetails.hasAssociatedWorkChunk()) {
myJobPersistence.markWorkChunkAsFailed(chunkId, t.toString());
}
return false;
}
if (theStepExecutionDetails.hasAssociatedWorkChunk()) {
int recordsProcessed = outcome.getRecordsProcessed();
int recoveredErrorCount = theDataSink.getRecoveredErrorCount();
myJobPersistence.markWorkChunkAsCompletedAndClearData(chunkId, recordsProcessed);
if (recoveredErrorCount > 0) {
myJobPersistence.incrementWorkChunkErrorCount(chunkId, recoveredErrorCount);
}
}
return true;
}
}

View File

@ -22,8 +22,10 @@ package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@ -88,6 +90,11 @@ public class SynchronizedJobPersistenceWrapper implements IJobPersistence {
myWrap.markWorkChunkAsCompletedAndClearData(theChunkId, theRecordsProcessed);
}
@Override
public void markWorkChunksWithStatusAndWipeData(String theInstanceId, List<String> theChunkIds, StatusEnum theStatus, String theErrorMsg) {
myWrap.markWorkChunksWithStatusAndWipeData(theInstanceId, theChunkIds, theStatus, theErrorMsg);
}
@Override
public void incrementWorkChunkErrorCount(String theChunkId, int theIncrementBy) {
myWrap.incrementWorkChunkErrorCount(theChunkId, theIncrementBy);
@ -98,6 +105,12 @@ public class SynchronizedJobPersistenceWrapper implements IJobPersistence {
return myWrap.fetchWorkChunksWithoutData(theInstanceId, thePageSize, thePageIndex);
}
@Override
public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
return myWrap.fetchAllWorkChunksIterator(theInstanceId, theWithData);
}
@Override
public synchronized void updateInstance(JobInstance theInstance) {
myWrap.updateInstance(theInstance);

View File

@ -49,10 +49,14 @@ class WorkChannelMessageHandler implements MessageHandler {
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final JobStepExecutorFactory myJobStepExecutorFactory;
WorkChannelMessageHandler(@Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, @Nonnull BatchJobSender theBatchJobSender) {
WorkChannelMessageHandler(@Nonnull IJobPersistence theJobPersistence,
@Nonnull JobDefinitionRegistry theJobDefinitionRegistry,
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull StepExecutionSvc theExecutorSvc
) {
myJobPersistence = theJobPersistence;
myJobDefinitionRegistry = theJobDefinitionRegistry;
myJobStepExecutorFactory = new JobStepExecutorFactory(theJobPersistence, theBatchJobSender);
myJobStepExecutorFactory = new JobStepExecutorFactory(theJobPersistence, theBatchJobSender, theExecutorSvc);
}
@Override

View File

@ -51,7 +51,7 @@ public class JobChunkProgressAccumulator {
return getChunkIdsWithStatus(theInstanceId, theStepId, theStatuses).size();
}
List<String> getChunkIdsWithStatus(String theInstanceId, String theStepId, Set<StatusEnum> theStatuses) {
public List<String> getChunkIdsWithStatus(String theInstanceId, String theStepId, Set<StatusEnum> theStatuses) {
return getChunkStatuses(theInstanceId).stream().filter(t -> t.myStepId.equals(theStepId)).filter(t -> theStatuses.contains(t.myStatus)).map(t -> t.myChunkId).collect(Collectors.toList());
}

View File

@ -22,6 +22,8 @@ package ca.uhn.fhir.batch2.maintenance;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobStepExecutorOutput;
import ca.uhn.fhir.batch2.coordinator.StepExecutionSvc;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
@ -44,11 +46,18 @@ public class JobInstanceProcessor {
private final JobInstance myInstance;
private final JobChunkProgressAccumulator myProgressAccumulator;
private final JobInstanceProgressCalculator myJobInstanceProgressCalculator;
private final StepExecutionSvc myJobExecutorSvc;
JobInstanceProcessor(IJobPersistence theJobPersistence, BatchJobSender theBatchJobSender, JobInstance theInstance, JobChunkProgressAccumulator theProgressAccumulator) {
JobInstanceProcessor(IJobPersistence theJobPersistence,
BatchJobSender theBatchJobSender,
JobInstance theInstance,
JobChunkProgressAccumulator theProgressAccumulator,
StepExecutionSvc theExecutorSvc
) {
myJobPersistence = theJobPersistence;
myBatchJobSender = theBatchJobSender;
myInstance = theInstance;
myJobExecutorSvc = theExecutorSvc;
myProgressAccumulator = theProgressAccumulator;
myJobInstanceProgressCalculator = new JobInstanceProgressCalculator(theJobPersistence, theInstance, theProgressAccumulator);
}
@ -69,7 +78,7 @@ public class JobInstanceProcessor {
private String buildCancelledMessage() {
String msg = "Job instance cancelled";
if (myInstance.hasGatedStep()) {
if (myInstance.hasGatedStep()) {
msg += " while running step " + myInstance.getCurrentGatedStepId();
}
return msg;
@ -120,8 +129,9 @@ public class JobInstanceProcessor {
return;
}
JobWorkCursor<?,?,?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(myInstance.getJobDefinition(), myInstance.getCurrentGatedStepId());
JobWorkCursor<?, ?, ?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(myInstance.getJobDefinition(), myInstance.getCurrentGatedStepId());
// final step
if (jobWorkCursor.isFinalStep()) {
return;
}
@ -129,21 +139,43 @@ public class JobInstanceProcessor {
String instanceId = myInstance.getInstanceId();
String currentStepId = jobWorkCursor.getCurrentStepId();
int incompleteChunks = myProgressAccumulator.countChunksWithStatus(instanceId, currentStepId, StatusEnum.getIncompleteStatuses());
if (incompleteChunks == 0) {
if (incompleteChunks == 0) {
String nextStepId = jobWorkCursor.nextStep.getStepId();
ourLog.info("All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", instanceId, currentStepId, nextStepId);
List<String> chunksForNextStep = myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, EnumSet.of(StatusEnum.QUEUED));
for (String nextChunkId : chunksForNextStep) {
JobWorkNotification workNotification = new JobWorkNotification(myInstance, nextStepId, nextChunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
myInstance.setCurrentGatedStepId(nextStepId);
myJobPersistence.updateInstance(myInstance);
if (jobWorkCursor.nextStep.isReductionStep()) {
processReductionStep(jobWorkCursor);
} else {
// otherwise, continue processing as expected
processChunksForNextSteps(instanceId, nextStepId);
}
}
}
private void processChunksForNextSteps(String instanceId, String nextStepId) {
List<String> chunksForNextStep = myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, EnumSet.of(StatusEnum.QUEUED));
for (String nextChunkId : chunksForNextStep) {
JobWorkNotification workNotification = new JobWorkNotification(myInstance, nextStepId, nextChunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
myInstance.setCurrentGatedStepId(nextStepId);
myJobPersistence.updateInstance(myInstance);
}
private void processReductionStep(JobWorkCursor<?, ?, ?> jobWorkCursor) {
// do execution of the final step now
// (ie, we won't send to job workers)
JobStepExecutorOutput<?, ?, ?> result = myJobExecutorSvc.doExecution(
JobWorkCursor.fromJobDefinitionAndRequestedStepId(myInstance.getJobDefinition(), jobWorkCursor.nextStep.getStepId()),
myInstance,
null);
if (!result.isSuccessful()) {
myInstance.setStatus(StatusEnum.FAILED);
myJobPersistence.updateInstance(myInstance);
}
}
public static boolean updateInstanceStatus(JobInstance myInstance, StatusEnum newStatus) {
@ -154,5 +186,4 @@ public class JobInstanceProcessor {
}
return false;
}
}

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.StepExecutionSvc;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
@ -71,12 +72,17 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService {
private final ISchedulerService mySchedulerService;
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final BatchJobSender myBatchJobSender;
private final StepExecutionSvc myJobExecutorSvc;
/**
* Constructor
*/
public JobMaintenanceServiceImpl(@Nonnull ISchedulerService theSchedulerService, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, @Nonnull BatchJobSender theBatchJobSender) {
public JobMaintenanceServiceImpl(@Nonnull ISchedulerService theSchedulerService,
@Nonnull IJobPersistence theJobPersistence,
@Nonnull JobDefinitionRegistry theJobDefinitionRegistry,
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull StepExecutionSvc theExecutor
) {
Validate.notNull(theSchedulerService);
Validate.notNull(theJobPersistence);
Validate.notNull(theJobDefinitionRegistry);
@ -86,6 +92,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService {
mySchedulerService = theSchedulerService;
myJobDefinitionRegistry = theJobDefinitionRegistry;
myBatchJobSender = theBatchJobSender;
myJobExecutorSvc = theExecutor;
}
@PostConstruct
@ -109,7 +116,8 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService {
for (JobInstance instance : instances) {
if (processedInstanceIds.add(instance.getInstanceId())) {
myJobDefinitionRegistry.setJobDefinition(instance);
JobInstanceProcessor jobInstanceProcessor = new JobInstanceProcessor(myJobPersistence, myBatchJobSender, instance, progressAccumulator);
JobInstanceProcessor jobInstanceProcessor = new JobInstanceProcessor(myJobPersistence,
myBatchJobSender, instance, progressAccumulator, myJobExecutorSvc);
jobInstanceProcessor.process();
}
}

View File

@ -0,0 +1,23 @@
package ca.uhn.fhir.batch2.model;
public class ChunkOutcome {
public enum Status {
SUCCESS,
FAIL,
ABORT;
}
private final Status myStatus;
public ChunkOutcome(Status theStatus) {
myStatus = theStatus;
}
public Status getStatuss() {
return myStatus;
}
public static ChunkOutcome SUCCESS() {
return new ChunkOutcome(Status.SUCCESS);
}
}

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.batch2.model;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.model.api.IModelJson;
import org.apache.commons.lang3.Validate;
@ -33,6 +34,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
@ -225,6 +227,11 @@ public class JobDefinition<PT extends IModelJson> {
return new Builder<>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, VoidModel.class, myParametersValidator, myGatedExecution, myCompletionHandler);
}
public <OT extends IModelJson> Builder<PT, OT> addFinalReducerStep(String theStepId, String theStepDescription, Class<OT> theOutputType, IReductionStepWorker<PT, NIT, OT> theStepWorker) {
mySteps.add(new JobDefinitionReductionStep<PT, NIT, OT>(theStepId, theStepDescription, theStepWorker, myNextInputType, theOutputType));
return new Builder<PT, OT>(mySteps, myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, theOutputType, myParametersValidator, myGatedExecution, myCompletionHandler);
}
public JobDefinition<PT> build() {
Validate.notNull(myJobParametersType, "No job parameters type was supplied");
return new JobDefinition<>(myJobDefinitionId, myJobDefinitionVersion, myJobDescription, myJobParametersType, Collections.unmodifiableList(mySteps), myParametersValidator, myGatedExecution, myCompletionHandler);

View File

@ -0,0 +1,25 @@
package ca.uhn.fhir.batch2.model;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
import ca.uhn.fhir.model.api.IModelJson;
import javax.annotation.Nonnull;
public class JobDefinitionReductionStep<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
extends JobDefinitionStep<PT, IT, OT> {
public JobDefinitionReductionStep(@Nonnull String theStepId,
@Nonnull String theStepDescription,
@Nonnull IReductionStepWorker<PT, IT, OT> theJobStepWorker,
@Nonnull Class<IT> theInputType,
@Nonnull Class<OT> theOutputType) {
super(theStepId, theStepDescription, (IJobStepWorker<PT, IT, OT>) theJobStepWorker, theInputType, theOutputType);
}
@Override
public boolean isReductionStep() {
return true;
}
}

View File

@ -32,12 +32,16 @@ public class JobDefinitionStep<PT extends IModelJson, IT extends IModelJson, OT
private final String myStepId;
private final String myStepDescription;
private final IJobStepWorker<PT, IT, OT> myJobStepWorker;
protected final IJobStepWorker<PT, IT, OT> myJobStepWorker;
private final Class<IT> myInputType;
private final Class<OT> myOutputType;
public JobDefinitionStep(@Nonnull String theStepId, @Nonnull String theStepDescription, @Nonnull IJobStepWorker<PT, IT, OT> theJobStepWorker, @Nonnull Class<IT> theInputType, @Nonnull Class<OT> theOutputType) {
public JobDefinitionStep(@Nonnull String theStepId,
@Nonnull String theStepDescription,
@Nonnull IJobStepWorker<PT, IT, OT> theJobStepWorker,
@Nonnull Class<IT> theInputType,
@Nonnull Class<OT> theOutputType) {
Validate.notBlank(theStepId, "No step ID specified");
Validate.isTrue(theStepId.length() <= ID_MAX_LENGTH, "Maximum ID length is %d", ID_MAX_LENGTH);
Validate.notBlank(theStepDescription);
@ -68,4 +72,8 @@ public class JobDefinitionStep<PT extends IModelJson, IT extends IModelJson, OT
public Class<OT> getOutputType() {
return myOutputType;
}
public boolean isReductionStep() {
return false;
}
}

View File

@ -86,6 +86,9 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
@JsonProperty(value = "estimatedCompletion", access = JsonProperty.Access.READ_ONLY)
private String myEstimatedTimeRemaining;
@JsonProperty(value = "report", access = JsonProperty.Access.READ_WRITE)
private String myReport;
@JsonIgnore
private JobDefinition<?> myJobDefinition;
@ -117,6 +120,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
setTotalElapsedMillis(theJobInstance.getTotalElapsedMillis());
setWorkChunksPurged(theJobInstance.isWorkChunksPurged());
setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId());
setReport(theJobInstance.getReport());
myJobDefinition = theJobInstance.getJobDefinition();
}
@ -271,6 +275,14 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
myCancelled = theCancelled;
}
public String getReport() {
return myReport;
}
public void setReport(String theReport) {
myReport = theReport;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
@ -288,6 +300,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson {
.append("errorMessage", myErrorMessage)
.append("errorCount", myErrorCount)
.append("estimatedTimeRemaining", myEstimatedTimeRemaining)
.append("record", myReport)
.toString();
}

View File

@ -107,4 +107,15 @@ public class JobWorkCursor<PT extends IModelJson, IT extends IModelJson, OT exte
return (JobWorkCursor<PT,IT, VoidModel>)this;
}
public JobDefinition<PT> getJobDefinition() {
return jobDefinition;
}
public JobDefinitionStep<PT, IT, OT> getCurrentStep() {
return currentStep;
}
public boolean isReductionStep() {
return currentStep.isReductionStep();
}
}

View File

@ -0,0 +1,19 @@
package ca.uhn.fhir.batch2.model;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
public class ListResult<T> implements IModelJson {
@JsonProperty("data")
private List<T> myData;
public ListResult(List<T> theData) {
myData = theData;
}
public List<T> getData() {
return myData;
}
}

View File

@ -22,11 +22,11 @@ package ca.uhn.fhir.batch2.progress;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import java.util.Iterator;
import java.util.List;
import static ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor.updateInstanceStatus;
@ -45,17 +45,12 @@ public class JobInstanceProgressCalculator {
public void calculateAndStoreInstanceProgress() {
InstanceProgress instanceProgress = new InstanceProgress();
for (int page = 0; ; page++) {
List<WorkChunk> chunks = myJobPersistence.fetchWorkChunksWithoutData(myInstance.getInstanceId(), JobMaintenanceServiceImpl.INSTANCES_PER_PASS, page);
Iterator<WorkChunk> workChunkIterator = myJobPersistence.fetchAllWorkChunksIterator(myInstance.getInstanceId(), false);
for (WorkChunk chunk : chunks) {
myProgressAccumulator.addChunk(chunk);
instanceProgress.addChunk(chunk);
}
if (chunks.size() < JobMaintenanceServiceImpl.INSTANCES_PER_PASS) {
break;
}
while (workChunkIterator.hasNext()) {
WorkChunk next = workChunkIterator.next();
myProgressAccumulator.addChunk(next);
instanceProgress.addChunk(next);
}
instanceProgress.updateInstance(myInstance);

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
@ -30,6 +31,8 @@ public abstract class BaseBatch2Test {
public static final String DATA_3_VALUE = "data 3 value";
public static final String DATA_4_VALUE = "data 4 value";
public static final String REDUCTION_JOB_ID = "REDUCTION_JOB_ID";
@Mock
protected IJobStepWorker<TestJobParameters, VoidModel, TestJobStep2InputType> myStep1Worker;
@Mock
@ -37,12 +40,19 @@ public abstract class BaseBatch2Test {
@Mock
protected IJobStepWorker<TestJobParameters, TestJobStep3InputType, VoidModel> myStep3Worker;
@Mock
protected IReductionStepWorker<TestJobParameters, TestJobStep3InputType, TestJobReductionOutputType> myReductionStepWorker;
@Nonnull
static JobInstance createInstance() {
return createInstance(JOB_DEFINITION_ID);
}
static JobInstance createInstance(String theJobId) {
JobInstance instance = new JobInstance();
instance.setInstanceId(INSTANCE_ID);
instance.setStatus(StatusEnum.IN_PROGRESS);
instance.setJobDefinitionId(JOB_DEFINITION_ID);
instance.setJobDefinitionId(theJobId);
instance.setJobDefinitionVersion(1);
instance.setParameters(new TestJobParameters()
.setParam1(PARAM_1_VALUE)
@ -71,4 +81,25 @@ public abstract class BaseBatch2Test {
return builder.build();
}
@SafeVarargs
protected final JobDefinition<TestJobParameters> createJobDefinitionWithReduction(Consumer<JobDefinition.Builder<TestJobParameters, ?>>... theModifiers) {
// create a job reduction
JobDefinition.Builder<TestJobParameters, TestJobReductionOutputType> builder = JobDefinition
.newBuilder()
.setJobDefinitionId(REDUCTION_JOB_ID)
.setJobDescription("Some description")
.setJobDefinitionVersion(1)
.gatedExecution()
.setParametersType(TestJobParameters.class)
.addFirstStep(STEP_1, "Step 1", TestJobStep2InputType.class, myStep1Worker)
.addIntermediateStep(STEP_2, "Step 2", TestJobStep3InputType.class, myStep2Worker)
.addFinalReducerStep(STEP_3, "Step 3", TestJobReductionOutputType.class, myReductionStepWorker);
for (Consumer<JobDefinition.Builder<TestJobParameters, ?>> next : theModifiers) {
next.accept(builder);
}
return builder.build();
}
}

View File

@ -58,6 +58,10 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
private IJobPersistence myJobInstancePersister;
@Mock
private JobDefinitionRegistry myJobDefinitionRegistry;
// The code refactored to keep the same functionality,
// but in this service (so it's a real service here!)
private StepExecutionSvc myJobStepExecutorSvc;
@Captor
private ArgumentCaptor<StepExecutionDetails<TestJobParameters, VoidModel>> myStep1ExecutionDetailsCaptor;
@Captor
@ -73,7 +77,8 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
@BeforeEach
public void beforeEach() {
mySvc = new JobCoordinatorImpl(myBatchJobSender, myWorkChannelReceiver, myJobInstancePersister, myJobDefinitionRegistry);
myJobStepExecutorSvc = new StepExecutionSvc(myJobInstancePersister, myBatchJobSender);
mySvc = new JobCoordinatorImpl(myBatchJobSender, myWorkChannelReceiver, myJobInstancePersister, myJobDefinitionRegistry, myJobStepExecutorSvc);
}
@Test
@ -274,7 +279,6 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
verify(myJobInstancePersister, times(1)).incrementWorkChunkErrorCount(eq(CHUNK_ID), eq(2));
verify(myJobInstancePersister, times(1)).markWorkChunkAsCompletedAndClearData(eq(CHUNK_ID), eq(50));
}
@Test
@ -483,8 +487,13 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
@Nonnull
private JobWorkNotification createWorkNotification(String theStepId) {
return createWorkNotification(JOB_DEFINITION_ID, theStepId);
}
@Nonnull
private JobWorkNotification createWorkNotification(String theJobId, String theStepId) {
JobWorkNotification payload = new JobWorkNotification();
payload.setJobDefinitionId(JOB_DEFINITION_ID);
payload.setJobDefinitionId(theJobId);
payload.setJobDefinitionVersion(1);
payload.setInstanceId(INSTANCE_ID);
payload.setChunkId(BaseBatch2Test.CHUNK_ID);
@ -494,9 +503,13 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
@Nonnull
static WorkChunk createWorkChunk(String theTargetStepId, IModelJson theData) {
return createWorkChunk(JOB_DEFINITION_ID, theTargetStepId, theData);
}
static WorkChunk createWorkChunk(String theJobId, String theTargetStepId, IModelJson theData) {
return new WorkChunk()
.setId(CHUNK_ID)
.setJobDefinitionId(JOB_DEFINITION_ID)
.setJobDefinitionId(theJobId)
.setJobDefinitionVersion(1)
.setTargetStepId(theTargetStepId)
.setData(theData)
@ -509,14 +522,22 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
return createWorkChunk(STEP_1, null);
}
@Nonnull
static WorkChunk createWorkChunkStep2() {
return createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE));
return createWorkChunkStep2(JOB_DEFINITION_ID);
}
@Nonnull
static WorkChunk createWorkChunkStep2(String theJobId) {
return createWorkChunk(theJobId, STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE));
}
@Nonnull
static WorkChunk createWorkChunkStep3() {
return createWorkChunk(STEP_3, new TestJobStep3InputType().setData3(DATA_3_VALUE).setData4(DATA_4_VALUE));
return createWorkChunkStep3(JOB_DEFINITION_ID);
}
@Nonnull
static WorkChunk createWorkChunkStep3(String theJobId) {
return createWorkChunk(theJobId, STEP_3, new TestJobStep3InputType().setData3(DATA_3_VALUE).setData4(DATA_4_VALUE));
}
}

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import com.google.common.collect.Lists;
@ -23,7 +24,10 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.messaging.Message;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunk;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep1;
@ -34,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
@ -51,6 +56,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
private ISchedulerService mySchedulerService;
@Mock
private IJobPersistence myJobPersistence;
@Mock
private StepExecutionSvc myJobExecutorSvc;
private JobMaintenanceServiceImpl mySvc;
@Captor
private ArgumentCaptor<JobInstance> myInstanceCaptor;
@ -66,16 +73,24 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
public void beforeEach() {
myJobDefinitionRegistry = new JobDefinitionRegistry();
BatchJobSender batchJobSender = new BatchJobSender(myWorkChannelProducer);
mySvc = new JobMaintenanceServiceImpl(mySchedulerService, myJobPersistence, myJobDefinitionRegistry, batchJobSender);
mySvc = new JobMaintenanceServiceImpl(mySchedulerService,
myJobPersistence,
myJobDefinitionRegistry,
batchJobSender,
myJobExecutorSvc
);
}
@Test
public void testInProgress_CalculateProgress_FirstCompleteButNoOtherStepsYetComplete() {
List<WorkChunk> chunks = new ArrayList<>();
chunks.add(createWorkChunk(STEP_1, null).setStatus(StatusEnum.COMPLETED));
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), anyInt(), eq(0))).thenReturn(Lists.newArrayList(
createWorkChunk(STEP_1, null).setStatus(StatusEnum.COMPLETED)
));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
mySvc.runMaintenancePass();
@ -84,16 +99,18 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void testInProgress_CalculateProgress_FirstStepComplete() {
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), anyInt(), eq(0))).thenReturn(Lists.newArrayList(
List<WorkChunk> chunks = Arrays.asList(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")),
createWorkChunkStep2().setStatus(StatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:01-04:00")),
createWorkChunkStep2().setStatus(StatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:02-04:00")),
createWorkChunkStep2().setStatus(StatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:03-04:00")),
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25),
createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
));
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
mySvc.runMaintenancePass();
@ -114,18 +131,20 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void testInProgress_CalculateProgress_InstanceHasErrorButNoChunksAreErrored() {
// Setup
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
JobInstance instance1 = createInstance();
instance1.setErrorMessage("This is an error message");
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), anyInt(), eq(0))).thenReturn(Lists.newArrayList(
List<WorkChunk> chunks = Arrays.asList(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")),
createWorkChunkStep2().setStatus(StatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:01-04:00")),
createWorkChunkStep2().setStatus(StatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setErrorCount(2),
createWorkChunkStep2().setStatus(StatusEnum.IN_PROGRESS).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setErrorCount(2),
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25),
createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
));
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
JobInstance instance1 = createInstance();
instance1.setErrorMessage("This is an error message");
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
// Execute
mySvc.runMaintenancePass();
@ -146,11 +165,13 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void testInProgress_GatedExecution_FirstStepComplete() {
// Setup
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), eq(100), eq(0))).thenReturn(Lists.newArrayList(
List<WorkChunk> chunks = Arrays.asList(
createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID),
createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID_2)
));
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator());
JobInstance instance1 = createInstance();
instance1.setCurrentGatedStepId(STEP_1);
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));
@ -185,17 +206,31 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void testInProgress_CalculateProgress_AllStepsComplete() {
// Setup
List<WorkChunk> chunks = new ArrayList<>();
chunks.add(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:01:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:01-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(t -> t.completionHandler(myCompletionHandler)));
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), anyInt(), eq(0))).thenReturn(Lists.newArrayList(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:01:00-04:00")).setRecordsProcessed(25),
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:01-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25),
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25),
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25),
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25),
createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenReturn(chunks.iterator());
// Execute
@ -223,16 +258,30 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void testInProgress_CalculateProgress_OneStepFailed() {
ArrayList<WorkChunk> chunks = new ArrayList<>();
chunks.add(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:01:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:01-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
createWorkChunkStep2().setStatus(StatusEnum.FAILED).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25).setErrorMessage("This is an error message")
);
chunks.add(
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
chunks.add(
createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition());
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(createInstance()));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), anyInt(), eq(0))).thenReturn(Lists.newArrayList(
createWorkChunkStep1().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:01:00-04:00")).setRecordsProcessed(25),
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:01-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25),
createWorkChunkStep2().setStatus(StatusEnum.FAILED).setStartTime(parseTime("2022-02-12T14:00:02-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25).setErrorMessage("This is an error message"),
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:03-04:00")).setEndTime(parseTime("2022-02-12T14:06:00-04:00")).setRecordsProcessed(25),
createWorkChunkStep2().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:00:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25),
createWorkChunkStep3().setStatus(StatusEnum.COMPLETED).setStartTime(parseTime("2022-02-12T14:01:00-04:00")).setEndTime(parseTime("2022-02-12T14:10:00-04:00")).setRecordsProcessed(25)
));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenReturn(chunks.iterator());
mySvc.runMaintenancePass();
@ -258,11 +307,16 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void afterFirstMaintenancePass() {
// Setup
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), eq(100), eq(0))).thenReturn(Lists.newArrayList(
createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID),
ArrayList<WorkChunk> chunks = new ArrayList<>();
chunks.add(
createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID)
);
chunks.add(
createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID_2)
));
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenReturn(chunks.iterator());
JobInstance instance1 = createInstance();
instance1.setCurrentGatedStepId(STEP_1);
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));
@ -282,11 +336,16 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
@Test
public void afterSecondMaintenancePass() {
// Setup
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution));
when(myJobPersistence.fetchWorkChunksWithoutData(eq(INSTANCE_ID), eq(100), eq(0))).thenReturn(Lists.newArrayList(
createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID),
ArrayList<WorkChunk> chunks = new ArrayList<>();
chunks.add(
createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID)
);
chunks.add(
createWorkChunkStep2().setStatus(StatusEnum.QUEUED).setId(CHUNK_ID_2)
));
);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenReturn(chunks.iterator());
JobInstance instance1 = createInstance();
instance1.setCurrentGatedStepId(STEP_1);
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));

View File

@ -0,0 +1,161 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class ReductionStepDataSinkTest {
private static final String INSTANCE_ID = "instanceId";
private static class TestJobParameters implements IModelJson { }
private static class StepInputData implements IModelJson { }
private static class StepOutputData implements IModelJson {
@JsonProperty("data")
private final String myData;
public StepOutputData(String theData) {
myData = theData;
}
}
private ReductionStepDataSink<TestJobParameters, StepInputData, StepOutputData> myDataSink;
@Mock
private JobWorkCursor<TestJobParameters, StepInputData, StepOutputData> myWorkCursor;
@Mock
private JobDefinition<TestJobParameters> myJobDefinition;
@Mock
private IJobPersistence myJobPersistence;
@Mock
private Appender<ILoggingEvent> myListAppender;
private Logger ourLogger;
@BeforeEach
private void init() {
when(myJobDefinition.getJobDefinitionId())
.thenReturn("jobDefinition");
when(myWorkCursor.getJobDefinition())
.thenReturn(myJobDefinition);
myDataSink = new ReductionStepDataSink<>(
INSTANCE_ID,
myWorkCursor,
myJobDefinition,
myJobPersistence
);
ourLogger = (Logger) LoggerFactory.getLogger(ReductionStepDataSink.class);
ourLogger.addAppender(myListAppender);
}
@Test
public void accept_validInputSubmittedOnlyOnce_updatesInstanceWithData() {
// setup
String data = "data";
StepOutputData stepData = new StepOutputData(data);
WorkChunkData<StepOutputData> chunkData = new WorkChunkData<>(stepData);
// when
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID)))
.thenReturn(Optional.of(createInstance()));
// test
myDataSink.accept(chunkData);
// verify
ArgumentCaptor<JobInstance> instanceCaptor = ArgumentCaptor.forClass(JobInstance.class);
verify(myJobPersistence)
.updateInstance(instanceCaptor.capture());
assertEquals(JsonUtil.serialize(stepData, false), instanceCaptor.getValue().getReport());
}
@Test
public void accept_multipleCalls_firstInWins() {
// setup
String data = "data";
String data2 = "data2";
WorkChunkData<StepOutputData> firstData = new WorkChunkData<>(new StepOutputData(data));
WorkChunkData<StepOutputData> secondData = new WorkChunkData<>(new StepOutputData(data2));
ourLogger.setLevel(Level.ERROR);
// when
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID)))
.thenReturn(Optional.of(createInstance()));
// test
myDataSink.accept(firstData);
myDataSink.accept(secondData);
// verify
ArgumentCaptor<ILoggingEvent> logCaptor = ArgumentCaptor.forClass(ILoggingEvent.class);
verify(myListAppender).doAppend(logCaptor.capture());
assertEquals(1, logCaptor.getAllValues().size());
ILoggingEvent log = logCaptor.getValue();
assertTrue(log.getFormattedMessage().contains(
"Report has already been set. Now it is being overwritten. Last in will win!"
));
}
@Test
public void accept_noInstanceIdFound_throwsJobExecutionFailed() {
// setup
String data = "data";
WorkChunkData<StepOutputData> chunkData = new WorkChunkData<>(new StepOutputData(data));
// when
when(myJobPersistence.fetchInstance(anyString()))
.thenReturn(Optional.empty());
// test
try {
myDataSink.accept(chunkData);
fail("Expected exception to be thrown");
} catch (JobExecutionFailedException ex) {
assertTrue(ex.getMessage().contains("No instance found with Id " + INSTANCE_ID));
} catch (Exception anyOtherEx) {
fail(anyOtherEx.getMessage());
}
}
private JobInstance createInstance() {
JobInstance instance = new JobInstance();
instance.setInstanceId(INSTANCE_ID);
return instance;
}
}

View File

@ -0,0 +1,652 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.ILastJobStepWorker;
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.JobStepFailedException;
import ca.uhn.fhir.batch2.api.ReductionStepExecutionDetails;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.ChunkOutcome;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionReductionStep;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SuppressWarnings({"unchecked", "rawtypes"})
@ExtendWith(MockitoExtension.class)
public class StepExecutionSvcTest {
private static final String INSTANCE_ID = "instanceId";
private static final String JOB_DEFINITION_ID = "jobDefId";
// static internal use classes
private enum StepType {
REDUCTION,
INTERMEDIATE,
FINAL
}
private static class TestJobParameters implements IModelJson { }
private static class StepInputData implements IModelJson { }
private static class StepOutputData implements IModelJson { }
private static class TestDataSink<OT extends IModelJson> extends BaseDataSink<TestJobParameters, StepInputData, OT> {
private BaseDataSink<?, ?, ?> myActualDataSink;
TestDataSink(JobWorkCursor<TestJobParameters, StepInputData, OT> theWorkCursor) {
super(INSTANCE_ID,
theWorkCursor);
}
public void setDataSink(BaseDataSink<?, ?, ?> theSink) {
myActualDataSink = theSink;
}
@Override
public void accept(WorkChunkData<OT> theData) {
}
@Override
public int getWorkChunkCount() {
return 0;
}
}
// our test class
private class TestStepExecutionSvc extends StepExecutionSvc {
public TestStepExecutionSvc(IJobPersistence thePersistence, BatchJobSender theSender) {
super(thePersistence, theSender);
}
@Override
protected <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> BaseDataSink<PT, IT, OT> getDataSink(
JobWorkCursor<PT, IT, OT> theCursor,
JobDefinition<PT> theJobDefinition,
String theInstanceId
) {
// cause we don't want to test the actual DataSink class here!
myDataSink.setDataSink(super.getDataSink(theCursor, theJobDefinition, theInstanceId));
return (BaseDataSink<PT, IT, OT>) myDataSink;
}
}
// general mocks
private TestDataSink myDataSink;
// step worker mocks
private final IJobStepWorker<TestJobParameters, StepInputData, StepOutputData> myNonReductionStep = mock(IJobStepWorker.class);
private final IReductionStepWorker<TestJobParameters, StepInputData, StepOutputData> myReductionStep = mock(IReductionStepWorker.class);
private final ILastJobStepWorker<TestJobParameters, StepInputData> myLastStep = mock(ILastJobStepWorker.class);
// class specific mocks
@Mock
private IJobPersistence myJobPersistence;
@Mock
private BatchJobSender myJobSender;
private TestStepExecutionSvc myExecutorSvc;
@BeforeEach
public void init() {
myExecutorSvc = new TestStepExecutionSvc(myJobPersistence, myJobSender);
}
private <OT extends IModelJson> JobDefinitionStep<TestJobParameters, StepInputData, OT> mockOutWorkCursor(
StepType theStepType,
JobWorkCursor<TestJobParameters, StepInputData, OT> theWorkCursor,
boolean theMockOutTargetStep,
boolean mockFinalWorkCursor
) {
JobDefinition<TestJobParameters> jobDefinition = createTestJobDefinition(
theStepType == StepType.REDUCTION
);
JobDefinitionStep<TestJobParameters, StepInputData, OT> step = (JobDefinitionStep<TestJobParameters, StepInputData, OT>) getJobDefinitionStep(
"stepId",
theStepType
);
when(theWorkCursor.getJobDefinition())
.thenReturn(jobDefinition);
when(theWorkCursor.getCurrentStep())
.thenReturn(step);
myDataSink = spy(new TestDataSink<>(theWorkCursor));
if (theMockOutTargetStep) {
when(myDataSink.getTargetStep())
.thenReturn(step);
}
if (mockFinalWorkCursor) {
JobWorkCursor<TestJobParameters, StepInputData, VoidModel> finalWorkCursor = mock(JobWorkCursor.class);
when(finalWorkCursor.getJobDefinition())
.thenReturn(jobDefinition);
when(theWorkCursor.asFinalCursor())
.thenReturn(finalWorkCursor);
}
return step;
}
@Test
public void doExecution_reductionStepWithValidInput_executesAsExpected() {
// setup
List<String> chunkIds = Arrays.asList("chunk1", "chunk2");
List<WorkChunk> chunks = new ArrayList<>();
for (String id : chunkIds) {
chunks.add(createWorkChunk(id));
}
JobInstance jobInstance = getTestJobInstance();
JobWorkCursor<TestJobParameters, StepInputData, StepOutputData> workCursor = mock(JobWorkCursor.class);
JobDefinitionStep<TestJobParameters, StepInputData, StepOutputData> step = mockOutWorkCursor(StepType.REDUCTION, workCursor, true, false);
// when
when(workCursor.isReductionStep())
.thenReturn(true);
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(true)))
.thenReturn(chunks.iterator());
when(myReductionStep.consume(any(ChunkExecutionDetails.class)))
.thenReturn(ChunkOutcome.SUCCESS());
when(myReductionStep.run(
any(StepExecutionDetails.class), any(IJobDataSink.class)
)).thenReturn(RunOutcome.SUCCESS);
// test
JobStepExecutorOutput<?, ?, ?> result = myExecutorSvc.doExecution(
workCursor,
jobInstance,
null
);
// verify
ArgumentCaptor<ChunkExecutionDetails> chunkCaptor = ArgumentCaptor.forClass(ChunkExecutionDetails.class);
verify(myReductionStep, times(chunks.size()))
.consume(chunkCaptor.capture());
List<ChunkExecutionDetails> chunksSubmitted = chunkCaptor.getAllValues();
assertEquals(chunks.size(), chunksSubmitted.size());
for (ChunkExecutionDetails submitted : chunksSubmitted) {
assertTrue(chunkIds.contains(submitted.getChunkId()));
}
assertTrue(result.isSuccessful());
assertTrue(myDataSink.myActualDataSink instanceof ReductionStepDataSink);
ArgumentCaptor<StepExecutionDetails> executionDetsCaptor = ArgumentCaptor.forClass(StepExecutionDetails.class);
verify(myReductionStep).run(executionDetsCaptor.capture(), eq(myDataSink));
assertTrue(executionDetsCaptor.getValue() instanceof ReductionStepExecutionDetails);
ArgumentCaptor<List<String>> chunkIdCaptor = ArgumentCaptor.forClass(List.class);
verify(myJobPersistence).markWorkChunksWithStatusAndWipeData(eq(INSTANCE_ID),
chunkIdCaptor.capture(), eq(StatusEnum.COMPLETED), eq(null));
List<String> capturedIds = chunkIdCaptor.getValue();
assertEquals(chunkIds.size(), capturedIds.size());
for (String chunkId : chunkIds) {
assertTrue(capturedIds.contains(chunkId));
}
// nevers
verifyNoErrors(0);
verify(myNonReductionStep, never()).run(any(), any());
verify(myLastStep, never()).run(any(), any());
}
@Test
public void doExecution_reductionStepWithErrors_returnsFalseAndMarksPreviousChunksFailed() {
// setup
String errorMsg = "Exceptional!";
List<String> chunkIds = Arrays.asList("chunk1", "chunk2");
List<WorkChunk> chunks = new ArrayList<>();
for (String id : chunkIds) {
chunks.add(createWorkChunk(id));
}
JobInstance jobInstance = getTestJobInstance();
JobWorkCursor<TestJobParameters, StepInputData, StepOutputData> workCursor = mock(JobWorkCursor.class);
JobDefinitionStep<TestJobParameters, StepInputData, StepOutputData> step = mockOutWorkCursor(StepType.REDUCTION, workCursor, false, false);
// when
when(workCursor.isReductionStep())
.thenReturn(true);
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(true)))
.thenReturn(chunks.iterator());
doThrow(new RuntimeException(errorMsg))
.when(myReductionStep).consume(any(ChunkExecutionDetails.class));
// test
JobStepExecutorOutput<?, ?, ?> result = myExecutorSvc.doExecution(
workCursor,
jobInstance,
null
);
// verify
assertFalse(result.isSuccessful());
ArgumentCaptor<String> chunkIdCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> errorCaptor = ArgumentCaptor.forClass(String.class);
verify(myJobPersistence, times(chunkIds.size()))
.markWorkChunkAsFailed(chunkIdCaptor.capture(), errorCaptor.capture());
List<String> chunkIdsCaptured = chunkIdCaptor.getAllValues();
List<String> errorsCaptured = errorCaptor.getAllValues();
for (int i = 0; i < chunkIds.size(); i++) {
String cId = chunkIdsCaptured.get(i);
String error = errorsCaptured.get(i);
assertTrue(chunkIds.contains(cId));
assertTrue(error.contains("Reduction step failed to execute chunk reduction for chunk"));
}
verify(myJobPersistence, never())
.markWorkChunksWithStatusAndWipeData(anyString(), anyList(), any(), anyString());
verify(myReductionStep, never())
.run(any(), any());
}
@Test
public void doExecution_reductionStepWithChunkFailures_marksChunkAsFailedButExecutesRestAsSuccess() {
// setup
List<String> chunkIds = Arrays.asList("chunk1", "chunk2");
List<WorkChunk> chunks = new ArrayList<>();
for (String id : chunkIds) {
chunks.add(createWorkChunk(id));
}
JobInstance jobInstance = getTestJobInstance();
JobWorkCursor<TestJobParameters, StepInputData, StepOutputData> workCursor = mock(JobWorkCursor.class);
JobDefinitionStep<TestJobParameters, StepInputData, StepOutputData> step = mockOutWorkCursor(StepType.REDUCTION, workCursor, true, false);
// when
when(workCursor.isReductionStep())
.thenReturn(true);
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(true)))
.thenReturn(chunks.iterator());
when(myReductionStep.consume(any(ChunkExecutionDetails.class)))
.thenReturn(ChunkOutcome.SUCCESS())
.thenReturn(new ChunkOutcome(ChunkOutcome.Status.FAIL));
when(myReductionStep.run(any(StepExecutionDetails.class), any(BaseDataSink.class)))
.thenReturn(RunOutcome.SUCCESS);
// test
JobStepExecutorOutput<?, ?, ?> result = myExecutorSvc.doExecution(
workCursor,
jobInstance,
null
);
// verify
assertFalse(result.isSuccessful());
verify(myJobPersistence)
.markWorkChunkAsFailed(eq(chunkIds.get(1)), anyString());
ArgumentCaptor<List> chunkListCaptor = ArgumentCaptor.forClass(List.class);
verify(myJobPersistence)
.markWorkChunksWithStatusAndWipeData(eq(INSTANCE_ID),
chunkListCaptor.capture(),
eq(StatusEnum.COMPLETED),
any());
List<String> completedIds = chunkListCaptor.getValue();
assertEquals(1, completedIds.size());
assertEquals(chunkIds.get(0), completedIds.get(0));
}
@Test
public void doExecution_reductionWithChunkAbort_marksAllFutureChunksAsFailedButPreviousAsSuccess() {
// setup
List<String> chunkIds = Arrays.asList("chunk1", "chunk2");
List<WorkChunk> chunks = new ArrayList<>();
for (String id : chunkIds) {
chunks.add(createWorkChunk(id));
}
JobInstance jobInstance = getTestJobInstance();
JobWorkCursor<TestJobParameters, StepInputData, StepOutputData> workCursor = mock(JobWorkCursor.class);
JobDefinitionStep<TestJobParameters, StepInputData, StepOutputData> step = mockOutWorkCursor(StepType.REDUCTION, workCursor, true, false);
// when
when(workCursor.isReductionStep())
.thenReturn(true);
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(true)))
.thenReturn(chunks.iterator());
when(myReductionStep.consume(any(ChunkExecutionDetails.class)))
.thenReturn(ChunkOutcome.SUCCESS())
.thenReturn(new ChunkOutcome(ChunkOutcome.Status.ABORT));
when(myReductionStep.run(any(StepExecutionDetails.class), any(BaseDataSink.class)))
.thenReturn(RunOutcome.SUCCESS);
// test
JobStepExecutorOutput<?, ?, ?> result = myExecutorSvc.doExecution(
workCursor,
jobInstance,
null
);
// verification
assertFalse(result.isSuccessful());
ArgumentCaptor<List> submittedListIds = ArgumentCaptor.forClass(List.class);
ArgumentCaptor<StatusEnum> statusCaptor = ArgumentCaptor.forClass(StatusEnum.class);
verify(myJobPersistence, times(chunkIds.size()))
.markWorkChunksWithStatusAndWipeData(
eq(INSTANCE_ID),
submittedListIds.capture(),
statusCaptor.capture(),
any()
);
assertEquals(2, submittedListIds.getAllValues().size());
List<String> list1 = submittedListIds.getAllValues().get(0);
List<String> list2 = submittedListIds.getAllValues().get(1);
assertTrue(list1.contains(chunkIds.get(0)));
assertTrue(list2.contains(chunkIds.get(1)));
// assumes the order of which is called first
// successes, then failures
assertEquals(2, statusCaptor.getAllValues().size());
List<StatusEnum> statuses = statusCaptor.getAllValues();
assertEquals(StatusEnum.COMPLETED, statuses.get(0));
assertEquals(StatusEnum.FAILED, statuses.get(1));
}
@Test
public void doExecution_nonReductionIntermediateStepWithValidInput_executesAsExpected() {
doExecution_nonReductionStep(0);
}
@Test
public void doExecution_withRecoveredErrors_marksThoseErrorsToChunks() {
doExecution_nonReductionStep(3);
}
private void doExecution_nonReductionStep(int theRecoveredErrorsForDataSink) {
// setup
JobInstance jobInstance = getTestJobInstance();
WorkChunk chunk = new WorkChunk();
chunk.setId("chunkId");
chunk.setData(new StepInputData());
JobWorkCursor<TestJobParameters, StepInputData, StepOutputData> workCursor = mock(JobWorkCursor.class);
JobDefinitionStep<TestJobParameters, StepInputData, StepOutputData> step = mockOutWorkCursor(StepType.INTERMEDIATE, workCursor, true, false);
// when
when(myNonReductionStep.run(
any(StepExecutionDetails.class), any(IJobDataSink.class)
)).thenReturn(RunOutcome.SUCCESS);
when(myDataSink.getRecoveredErrorCount())
.thenReturn(theRecoveredErrorsForDataSink);
// test
JobStepExecutorOutput<?, ?, ?> result = myExecutorSvc.doExecution(
workCursor,
jobInstance,
chunk
);
// verify
assertTrue(result.isSuccessful());
verify(myJobPersistence)
.markWorkChunkAsCompletedAndClearData(eq(chunk.getId()), anyInt());
assertTrue(myDataSink.myActualDataSink instanceof JobDataSink);
if (theRecoveredErrorsForDataSink > 0) {
verify(myJobPersistence)
.incrementWorkChunkErrorCount(anyString(), eq(theRecoveredErrorsForDataSink));
}
// nevers
verifyNoErrors(theRecoveredErrorsForDataSink);
verifyNonReductionStep();
verify(myLastStep, never()).run(any(), any());
verify(myReductionStep, never()).run(any(), any());
}
@Test
public void doExecution_finalNonReductionStep_executesAsExpected() {
// setup
JobInstance jobInstance = getTestJobInstance();
WorkChunk chunk = new WorkChunk();
chunk.setId("chunkId");
chunk.setData(new StepInputData());
JobWorkCursor<TestJobParameters, StepInputData, VoidModel> workCursor = mock(JobWorkCursor.class);
JobDefinitionStep<TestJobParameters, StepInputData, VoidModel> step = mockOutWorkCursor(StepType.FINAL, workCursor, true, true);
// when
when(workCursor.isFinalStep())
.thenReturn(true);
when(myLastStep.run(any(StepExecutionDetails.class), any(BaseDataSink.class)))
.thenReturn(RunOutcome.SUCCESS);
// test
JobStepExecutorOutput<?, ?, ?> result = myExecutorSvc.doExecution(
workCursor,
jobInstance,
chunk
);
// verify
assertTrue(result.isSuccessful());
assertTrue(myDataSink.myActualDataSink instanceof FinalStepDataSink);
// nevers
verifyNoErrors(0);
verifyNonReductionStep();
verify(myReductionStep, never()).run(any(), any());
verify(myNonReductionStep, never()).run(any(), any());
}
@Test
public void doExecute_stepWorkerThrowsJobExecutionException_marksWorkChunkAsFailed() {
runExceptionThrowingTest(new JobExecutionFailedException("Failure"));
verify(myJobPersistence)
.markWorkChunkAsFailed(anyString(), anyString());
}
@Test
public void doExecution_stepWorkerThrowsRandomException_rethrowsJobStepFailedException() {
String msg = "failure";
try {
runExceptionThrowingTest(new RuntimeException(msg));
fail("Expected Exception to be thrown");
} catch (JobStepFailedException jobStepFailedException) {
assertTrue(jobStepFailedException.getMessage().contains(msg));
} catch (Exception anythingElse) {
fail(anythingElse.getMessage());
}
}
private void runExceptionThrowingTest(Exception theExceptionToThrow) {
// setup
JobInstance jobInstance = getTestJobInstance();
WorkChunk chunk = new WorkChunk();
chunk.setId("chunkId");
chunk.setData(new StepInputData());
JobWorkCursor<TestJobParameters, StepInputData, StepOutputData> workCursor = mock(JobWorkCursor.class);
JobDefinitionStep<TestJobParameters, StepInputData, StepOutputData> step = mockOutWorkCursor(StepType.INTERMEDIATE, workCursor, true, false);
// when
when(myNonReductionStep.run(any(), any()))
.thenThrow(theExceptionToThrow);
// test
JobStepExecutorOutput<?, ?, ?> output = myExecutorSvc.doExecution(
workCursor,
jobInstance,
chunk
);
// verify
assertFalse(output.isSuccessful());
}
/**********************/
private void verifyNoErrors(int theRecoveredErrorCount) {
if (theRecoveredErrorCount == 0) {
verify(myJobPersistence, never())
.incrementWorkChunkErrorCount(anyString(), anyInt());
}
verify(myJobPersistence, never())
.markWorkChunkAsFailed(anyString(), anyString());
verify(myJobPersistence, never())
.markWorkChunkAsErroredAndIncrementErrorCount(anyString(), anyString());
}
private void verifyNonReductionStep() {
verify(myJobPersistence, never())
.fetchWorkChunkSetStartTimeAndMarkInProgress(anyString());
verify(myJobPersistence, never())
.markWorkChunksWithStatusAndWipeData(anyString(), anyList(), any(), any());
verify(myJobPersistence, never())
.fetchAllWorkChunksIterator(anyString(), anyBoolean());
}
private JobInstance getTestJobInstance() {
JobInstance instance = new JobInstance();
instance.setInstanceId(INSTANCE_ID);
instance.setParameters(new TestJobParameters());
return instance;
}
private WorkChunk createWorkChunk(String theId) {
WorkChunk chunk = new WorkChunk();
chunk.setInstanceId(INSTANCE_ID);
chunk.setId(theId);
chunk.setStatus(StatusEnum.QUEUED);
chunk.setData(JsonUtil.serialize(
new StepInputData()
));
return chunk;
}
@SuppressWarnings("unchecked")
private JobDefinition<TestJobParameters> createTestJobDefinition(boolean theWithReductionStep) {
JobDefinition<TestJobParameters> def = null;
if (theWithReductionStep) {
def = JobDefinition.newBuilder()
.setJobDefinitionId(JOB_DEFINITION_ID)
.setJobDescription("Reduction job description")
.setJobDefinitionVersion(1)
.setParametersType(TestJobParameters.class)
.addFirstStep(
"step 1",
"description 1",
VoidModel.class,
mock(IJobStepWorker.class) // we don't care about this step - we just need it
)
.addFinalReducerStep(
"step last",
"description 2",
StepOutputData.class,
myReductionStep
)
.build();
} else {
def = JobDefinition.newBuilder()
.setJobDefinitionId(JOB_DEFINITION_ID)
.setJobDescription("Non reduction job description")
.setJobDefinitionVersion(1)
.setParametersType(TestJobParameters.class)
.addFirstStep(
"step 1",
"description 1",
VoidModel.class,
mock(IJobStepWorker.class) // we don't care about this step
)
.addIntermediateStep(
"Step 2",
"description 2",
StepInputData.class,
myNonReductionStep
)
.addLastStep(
"Step 3",
"description 3",
myLastStep
)
.build();
}
return def;
}
private JobDefinitionStep<TestJobParameters, StepInputData, ?> getJobDefinitionStep(
String theId,
StepType theStepType
) {
if (theStepType == StepType.REDUCTION) {
return new JobDefinitionReductionStep<>(
theId,
"i'm a reduction step",
myReductionStep,
StepInputData.class,
StepOutputData.class
);
} else if (theStepType == StepType.INTERMEDIATE) {
return new JobDefinitionStep<>(
theId,
"i'm a step - many like me, but i'm unique",
myNonReductionStep,
StepInputData.class,
StepOutputData.class
);
} else if (theStepType == StepType.FINAL) {
return new JobDefinitionStep<>(
theId,
"I'm a final step",
myLastStep,
StepInputData.class,
VoidModel.class
);
}
/// TODO - log
return null;
}
}

View File

@ -0,0 +1,6 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.model.api.IModelJson;
public class TestJobReductionOutputType implements IModelJson {
}

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -99,7 +99,6 @@ import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.IdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE2-SNAPSHOT</version>
<version>6.1.0-PRE3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Some files were not shown because too many files have changed in this diff Show More