Add tests. pull some inner classes out as package private classes. (#3516)

* Add tests.  pull some inner classes out as package private classes.

* review feedback
This commit is contained in:
Ken Stevens 2022-04-02 16:34:26 -04:00 committed by GitHub
parent 60bbe271a5
commit 40de855d17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 470 additions and 174 deletions

View File

@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.batch2;
*/
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.impl.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
@ -28,7 +29,6 @@ import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.util.JsonUtil;
import org.apache.commons.lang3.Validate;
import org.springframework.data.domain.PageRequest;
@ -59,15 +59,15 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
@Override
public String storeWorkChunk(String theJobDefinitionId, int theJobDefinitionVersion, String theTargetStepId, String theInstanceId, int theSequence, String theDataSerialized) {
public String storeWorkChunk(BatchWorkChunk theBatchWorkChunk) {
Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity();
entity.setId(UUID.randomUUID().toString());
entity.setSequence(theSequence);
entity.setJobDefinitionId(theJobDefinitionId);
entity.setJobDefinitionVersion(theJobDefinitionVersion);
entity.setTargetStepId(theTargetStepId);
entity.setInstanceId(theInstanceId);
entity.setSerializedData(theDataSerialized);
entity.setSequence(theBatchWorkChunk.sequence);
entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId);
entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion);
entity.setTargetStepId(theBatchWorkChunk.targetStepId);
entity.setInstanceId(theBatchWorkChunk.instanceId);
entity.setSerializedData(theBatchWorkChunk.serializedData);
entity.setCreateTime(new Date());
entity.setStatus(StatusEnum.QUEUED);
myWorkChunkRepository.save(entity);

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.impl.BatchWorkChunk;
import ca.uhn.fhir.batch2.jobs.imprt.NdJsonFileJson;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
@ -8,11 +9,9 @@ import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.util.JsonUtil;
import com.github.jsonldjava.shaded.com.google.common.collect.Lists;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
@ -20,17 +19,13 @@ import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.in;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -40,7 +35,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@TestMethodOrder(MethodOrderer.MethodName.class)
public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
public static final String JOB_DEFINITION_ID = "definition-id";
public static final String TARGET_STEP_ID = "step-id";
public static final String DEF_CHUNK_ID = "definition-chunkId";
public static final String STEP_CHUNK_ID = "step-chunkId";
public static final int JOB_DEF_VER = 1;
public static final int SEQUENCE_NUMBER = 1;
public static final String CHUNK_DATA = "{\"key\":\"value\"}";
@Autowired
private IJobPersistence mySvc;
@Autowired
@ -55,7 +57,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
for (int i = 0; i < 10; i++) {
mySvc.storeWorkChunk("definition-id", 1, "step-id", instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}")));
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}")));
}
// Execute
@ -70,6 +72,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
});
}
private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData);
return mySvc.storeWorkChunk(batchWorkChunk);
}
@Test
public void testDeleteChunks() {
// Setup
@ -77,7 +84,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
for (int i = 0; i < 10; i++) {
mySvc.storeWorkChunk("definition-id", 1, "step-id", instanceId, i, CHUNK_DATA);
storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, CHUNK_DATA);
}
// Execute
@ -104,8 +111,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance foundInstance = mySvc.fetchInstanceAndMarkInProgress(instanceId).orElseThrow(() -> new IllegalStateException());
assertEquals(instanceId, foundInstance.getInstanceId());
assertEquals("definition-id", foundInstance.getJobDefinitionId());
assertEquals(1, foundInstance.getJobDefinitionVersion());
assertEquals(JOB_DEFINITION_ID, foundInstance.getJobDefinitionId());
assertEquals(JOB_DEF_VER, foundInstance.getJobDefinitionVersion());
assertEquals(StatusEnum.IN_PROGRESS, foundInstance.getStatus());
assertEquals(CHUNK_DATA, foundInstance.getParameters());
@ -131,8 +138,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance foundInstance = mySvc.fetchInstanceAndMarkInProgress(instanceId).orElseThrow(() -> new IllegalStateException());
assertEquals(instanceId, foundInstance.getInstanceId());
assertEquals("definition-id", foundInstance.getJobDefinitionId());
assertEquals(1, foundInstance.getJobDefinitionVersion());
assertEquals(JOB_DEFINITION_ID, foundInstance.getJobDefinitionId());
assertEquals(JOB_DEF_VER, foundInstance.getJobDefinitionVersion());
assertEquals(StatusEnum.IN_PROGRESS, foundInstance.getStatus());
assertTrue( foundInstance.isCancelled());
assertEquals(CHUNK_DATA, foundInstance.getParameters());
@ -146,8 +153,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance foundInstance = mySvc.fetchInstanceAndMarkInProgress(instanceId).orElseThrow(() -> new IllegalStateException());
assertEquals(36, foundInstance.getInstanceId().length());
assertEquals("definition-id", foundInstance.getJobDefinitionId());
assertEquals(1, foundInstance.getJobDefinitionVersion());
assertEquals(JOB_DEFINITION_ID, foundInstance.getJobDefinitionId());
assertEquals(JOB_DEF_VER, foundInstance.getJobDefinitionVersion());
assertEquals(StatusEnum.IN_PROGRESS, foundInstance.getStatus());
assertEquals(CHUNK_DATA, foundInstance.getParameters());
}
@ -159,7 +166,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
List<String> ids =new ArrayList<>();
for (int i = 0; i < 10; i++) {
String id = mySvc.storeWorkChunk("definition-id", 1, "step-id", instanceId, i, CHUNK_DATA);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, CHUNK_DATA);
ids.add(id);
}
@ -197,7 +204,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String id = mySvc.storeWorkChunk("definition-id", 1, "step-id", instanceId, 0, null);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(() -> new IllegalArgumentException());
assertEquals(null, chunk.getData());
@ -208,14 +215,14 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String id = mySvc.storeWorkChunk("definition-id", 1, "step-id", instanceId, 0, CHUNK_DATA);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
assertNotNull(id);
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(() -> new IllegalArgumentException()).getStatus()));
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(() -> new IllegalArgumentException());
assertEquals(36, chunk.getInstanceId().length());
assertEquals("definition-id", chunk.getJobDefinitionId());
assertEquals(1, chunk.getJobDefinitionVersion());
assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion());
assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(CHUNK_DATA, chunk.getData());
@ -226,7 +233,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
public void testMarkChunkAsCompleted_Success() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = mySvc.storeWorkChunk("definition-chunkId", 1, "step-chunkId", instanceId, 1, CHUNK_DATA);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException()).getStatus()));
@ -234,7 +241,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(() -> new IllegalArgumentException());
assertEquals(1, chunk.getSequence());
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus());
assertNotNull(chunk.getCreateTime());
assertNotNull(chunk.getStartTime());
@ -267,7 +274,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = mySvc.storeWorkChunk("definition-chunkId", 1, "step-chunkId", instanceId, 1, null);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
// Execute
@ -286,7 +293,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
public void testMarkChunkAsCompleted_Error() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = mySvc.storeWorkChunk("definition-chunkId", 1, "step-chunkId", instanceId, 1, null);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException()).getStatus()));
@ -294,7 +301,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(() -> new IllegalArgumentException());
assertEquals(1, chunk.getSequence());
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
@ -336,7 +343,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
public void testMarkChunkAsCompleted_Fail() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = mySvc.storeWorkChunk("definition-chunkId", 1, "step-chunkId", instanceId, 1, null);
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException()).getStatus()));
@ -344,7 +351,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(() -> new IllegalArgumentException());
assertEquals(1, chunk.getSequence());
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
@ -413,9 +420,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
@Nonnull
private JobInstance createInstance() {
JobInstance instance = new JobInstance();
instance.setJobDefinitionId("definition-id");
instance.setJobDefinitionId(JOB_DEFINITION_ID);
instance.setStatus(StatusEnum.QUEUED);
instance.setJobDefinitionVersion(1);
instance.setJobDefinitionVersion(JOB_DEF_VER);
instance.setParameters(CHUNK_DATA);
return instance;
}

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.batch2.api;
* #L%
*/
import ca.uhn.fhir.batch2.impl.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
@ -34,20 +35,16 @@ public interface IJobPersistence {
* <p>
* Chunk should be stored with a status of {@link ca.uhn.fhir.batch2.model.StatusEnum#QUEUED}
*
* @param theJobDefinitionId The job definition ID
* @param theJobDefinitionVersion The job definition version
* @param theTargetStepId The step ID that will be responsible for consuming this chunk
* @param theInstanceId The instance ID associated with this chunk
* @param theDataSerialized The data. This will be in the form of a map where the values may be strings, lists, and other maps (i.e. JSON)
* @return Returns a globally unique identifier for this chunk. This should be a sequentially generated ID, a UUID, or something like that which is guaranteed to never overlap across jobs or instances.
* @param theBatchWorkChunk the batch work chunk to be stored
* @return a globally unique identifier for this chunk. This should be a sequentially generated ID, a UUID, or something like that which is guaranteed to never overlap across jobs or instances.
*/
String storeWorkChunk(String theJobDefinitionId, int theJobDefinitionVersion, String theTargetStepId, String theInstanceId, int theSequence, String theDataSerialized);
String storeWorkChunk(BatchWorkChunk theBatchWorkChunk);
/**
* Fetches a chunk of work from storage, and update the stored status
* to {@link ca.uhn.fhir.batch2.model.StatusEnum#IN_PROGRESS}
*
* @param theChunkId The ID, as returned by {@link #storeWorkChunk(String, int, String, String, int, String)}
* @param theChunkId The ID, as returned by {@link #storeWorkChunk(BatchWorkChunk theBatchWorkChunk)}
* @return The chunk of work
*/
Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId);

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.batch2.config;
import ca.uhn.fhir.batch2.api.IJobCleanerService;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.impl.BatchJobSender;
import ca.uhn.fhir.batch2.impl.JobCleanerServiceImpl;
import ca.uhn.fhir.batch2.impl.JobCoordinatorImpl;
import ca.uhn.fhir.batch2.impl.JobDefinitionRegistry;
@ -33,7 +34,6 @@ import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -48,9 +48,9 @@ public abstract class BaseBatch2Config {
}
@Bean
public IJobCoordinator batch2JobCoordinator(@Autowired IChannelFactory theChannelFactory, IJobPersistence theJobInstancePersister, JobDefinitionRegistry theJobDefinitionRegistry) {
public IJobCoordinator batch2JobCoordinator(IChannelFactory theChannelFactory, IJobPersistence theJobInstancePersister, JobDefinitionRegistry theJobDefinitionRegistry) {
return new JobCoordinatorImpl(
batch2ProcessingChannelProducer(theChannelFactory),
new BatchJobSender(batch2ProcessingChannelProducer(theChannelFactory)),
batch2ProcessingChannelReceiver(theChannelFactory),
theJobInstancePersister,
theJobDefinitionRegistry
@ -58,19 +58,19 @@ public abstract class BaseBatch2Config {
}
@Bean
public IJobCleanerService batch2JobCleaner(@Autowired ISchedulerService theSchedulerService, @Autowired IJobPersistence theJobPersistence) {
public IJobCleanerService batch2JobCleaner(ISchedulerService theSchedulerService, IJobPersistence theJobPersistence) {
return new JobCleanerServiceImpl(theSchedulerService, theJobPersistence);
}
@Bean
public IChannelProducer batch2ProcessingChannelProducer(@Autowired IChannelFactory theChannelFactory) {
public IChannelProducer batch2ProcessingChannelProducer(IChannelFactory theChannelFactory) {
ChannelProducerSettings settings = new ChannelProducerSettings()
.setConcurrentConsumers(getConcurrentConsumers());
return theChannelFactory.getOrCreateProducer(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, settings);
}
@Bean
public IChannelReceiver batch2ProcessingChannelReceiver(@Autowired IChannelFactory theChannelFactory) {
public IChannelReceiver batch2ProcessingChannelReceiver(IChannelFactory theChannelFactory) {
ChannelConsumerSettings settings = new ChannelConsumerSettings()
.setConcurrentConsumers(getConcurrentConsumers());
return theChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, settings);

View File

@ -0,0 +1,35 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.model.api.IModelJson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class BaseDataSink<OT extends IModelJson> implements IJobDataSink<OT> {
private static final Logger ourLog = LoggerFactory.getLogger(BaseDataSink.class);
private final String myInstanceId;
private final String myCurrentStepId;
private int myRecoveredErrorCount;
protected BaseDataSink(String theInstanceId, String theCurrentStepId) {
myInstanceId = theInstanceId;
myCurrentStepId = theCurrentStepId;
}
public String getInstanceId() {
return myInstanceId;
}
@Override
public void recoveredError(String theMessage) {
ourLog.error("Error during job[{}] step[{}]: {}", myInstanceId, myCurrentStepId, theMessage);
myRecoveredErrorCount++;
}
public int getRecoveredErrorCount() {
return myRecoveredErrorCount;
}
public abstract int getWorkChunkCount();
}

View File

@ -0,0 +1,24 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BatchJobSender {
private static final Logger ourLog = LoggerFactory.getLogger(BatchJobSender.class);
private final IChannelProducer myWorkChannelProducer;
public BatchJobSender(IChannelProducer theWorkChannelProducer) {
myWorkChannelProducer = theWorkChannelProducer;
}
void sendWorkChannelMessage(JobWorkNotification theJobWorkNotification) {
JobWorkNotificationJsonMessage message = new JobWorkNotificationJsonMessage();
message.setPayload(theJobWorkNotification);
ourLog.info("Sending work notification for {}", theJobWorkNotification);
myWorkChannelProducer.send(message);
}
}

View File

@ -0,0 +1,56 @@
package ca.uhn.fhir.batch2.impl;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
public class BatchWorkChunk {
public final String jobDefinitionId;
public final int jobDefinitionVersion;
public final String targetStepId;
public final String instanceId;
public final int sequence;
public final String serializedData;
/**
* Constructor
*
* @param theJobDefinitionId The job definition ID
* @param theJobDefinitionVersion The job definition version
* @param theTargetStepId The step ID that will be responsible for consuming this chunk
* @param theInstanceId The instance ID associated with this chunk
* @param theSerializedData The data. This will be in the form of a map where the values may be strings, lists, and other maps (i.e. JSON)
*/
public BatchWorkChunk(String theJobDefinitionId, int theJobDefinitionVersion, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
jobDefinitionId = theJobDefinitionId;
jobDefinitionVersion = theJobDefinitionVersion;
targetStepId = theTargetStepId;
instanceId = theInstanceId;
sequence = theSequence;
serializedData = theSerializedData;
}
@Override
public boolean equals(Object theO) {
if (this == theO) return true;
if (theO == null || getClass() != theO.getClass()) return false;
BatchWorkChunk that = (BatchWorkChunk) theO;
return new EqualsBuilder()
.append(jobDefinitionVersion, that.jobDefinitionVersion)
.append(sequence, that.sequence)
.append(jobDefinitionId, that.jobDefinitionId)
.append(targetStepId, that.targetStepId)
.append(instanceId, that.instanceId)
.append(serializedData, that.serializedData)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37).append(jobDefinitionId).append(jobDefinitionVersion).append(targetStepId).append(instanceId).append(sequence).append(serializedData).toHashCode();
}
}

View File

@ -0,0 +1,34 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.i18n.Msg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class FinalStepDataSink extends BaseDataSink<VoidModel> {
private static final Logger ourLog = LoggerFactory.getLogger(FinalStepDataSink.class);
private final String myJobDefinitionId;
/**
* Constructor
*/
FinalStepDataSink(String theJobDefinitionId, String theInstanceId, String theCurrentStepId) {
super(theInstanceId, theCurrentStepId);
myJobDefinitionId = theJobDefinitionId;
}
@Override
public void accept(WorkChunkData<VoidModel> theData) {
String msg = "Illegal attempt to store data during final step of job " + myJobDefinitionId;
ourLog.error(msg);
throw new JobExecutionFailedException(Msg.code(2045) + msg);
}
@Override
public int getWorkChunkCount() {
return 0;
}
}

View File

@ -21,7 +21,6 @@ package ca.uhn.fhir.batch2.impl;
*/
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
@ -37,9 +36,7 @@ import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.model.api.annotation.PasswordField;
@ -68,7 +65,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
@ -77,7 +73,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinator {
private static final Logger ourLog = LoggerFactory.getLogger(JobCoordinatorImpl.class);
private final IChannelProducer myWorkChannelProducer;
private final BatchJobSender myBatchJobSender;
private final IChannelReceiver myWorkChannelReceiver;
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final MessageHandler myReceiverHandler = new WorkChannelMessageHandler();
@ -87,12 +83,12 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
* Constructor
*/
public JobCoordinatorImpl(
@Nonnull IChannelProducer theWorkChannelProducer,
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull IChannelReceiver theWorkChannelReceiver,
@Nonnull IJobPersistence theJobPersistence,
@Nonnull JobDefinitionRegistry theJobDefinitionRegistry) {
super(theJobPersistence);
myWorkChannelProducer = theWorkChannelProducer;
myBatchJobSender = theBatchJobSender;
myWorkChannelReceiver = theWorkChannelReceiver;
myJobDefinitionRegistry = theJobDefinitionRegistry;
}
@ -120,9 +116,12 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
instance.setParameters(theStartRequest.getParameters());
String instanceId = myJobPersistence.storeNewInstance(instance);
String chunkId = myJobPersistence.storeWorkChunk(jobDefinitionId, jobDefinitionVersion, firstStepId, instanceId, 0, null);
sendWorkChannelMessage(jobDefinitionId, jobDefinitionVersion, instanceId, firstStepId, chunkId);
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(jobDefinitionId, jobDefinitionVersion, firstStepId, instanceId, 0, null);
String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk);
JobWorkNotification workNotification = new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, instanceId, firstStepId, chunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
return instanceId;
}
@ -245,20 +244,6 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
myWorkChannelReceiver.unsubscribe(myReceiverHandler);
}
private void sendWorkChannelMessage(String theJobDefinitionId, int jobDefinitionVersion, String theInstanceId, String theTargetStepId, String theChunkId) {
JobWorkNotificationJsonMessage message = new JobWorkNotificationJsonMessage();
JobWorkNotification workNotification = new JobWorkNotification();
workNotification.setJobDefinitionId(theJobDefinitionId);
workNotification.setJobDefinitionVersion(jobDefinitionVersion);
workNotification.setChunkId(theChunkId);
workNotification.setInstanceId(theInstanceId);
workNotification.setTargetStepId(theTargetStepId);
message.setPayload(workNotification);
ourLog.info("Sending work notification for job[{}] instance[{}] step[{}] chunk[{}]", theJobDefinitionId, theInstanceId, theTargetStepId, theChunkId);
myWorkChannelProducer.send(message);
}
private void handleWorkChannelMessage(JobWorkNotificationJsonMessage theMessage) {
JobWorkNotification payload = theMessage.getPayload();
@ -321,7 +306,7 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
BaseDataSink<OT> dataSink;
if (theSubsequentStep != null) {
dataSink = new JobDataSink<>(jobDefinitionId, jobDefinitionVersion, theSubsequentStep, instanceId, theStep.getStepId());
dataSink = new JobDataSink<>(myBatchJobSender, myJobPersistence, jobDefinitionId, jobDefinitionVersion, theSubsequentStep, instanceId, theStep.getStepId());
} else {
dataSink = (BaseDataSink<OT>) new FinalStepDataSink(jobDefinitionId, instanceId, theStep.getStepId());
}
@ -382,90 +367,4 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato
handleWorkChannelMessage((JobWorkNotificationJsonMessage) theMessage);
}
}
private abstract class BaseDataSink<OT extends IModelJson> implements IJobDataSink<OT> {
private final String myInstanceId;
private final String myCurrentStepId;
private int myRecoveredErrorCount;
protected BaseDataSink(String theInstanceId, String theCurrentStepId) {
myInstanceId = theInstanceId;
myCurrentStepId = theCurrentStepId;
}
public String getInstanceId() {
return myInstanceId;
}
@Override
public void recoveredError(String theMessage) {
ourLog.error("Error during job[{}] step[{}]: {}", myInstanceId, myCurrentStepId, theMessage);
myRecoveredErrorCount++;
}
public int getRecoveredErrorCount() {
return myRecoveredErrorCount;
}
public abstract int getWorkChunkCount();
}
private class JobDataSink<OT extends IModelJson> extends BaseDataSink<OT> {
private final String myJobDefinitionId;
private final int myJobDefinitionVersion;
private final JobDefinitionStep<?, ?, ?> myTargetStep;
private final AtomicInteger myChunkCounter = new AtomicInteger(0);
public JobDataSink(String theJobDefinitionId, int theJobDefinitionVersion, JobDefinitionStep<?, ?, ?> theTargetStep, String theInstanceId, String theCurrentStepId) {
super(theInstanceId, theCurrentStepId);
myJobDefinitionId = theJobDefinitionId;
myJobDefinitionVersion = theJobDefinitionVersion;
myTargetStep = theTargetStep;
}
@Override
public void accept(WorkChunkData<OT> theData) {
String jobDefinitionId = myJobDefinitionId;
int jobDefinitionVersion = myJobDefinitionVersion;
String instanceId = getInstanceId();
String targetStepId = myTargetStep.getStepId();
int sequence = myChunkCounter.getAndIncrement();
OT dataValue = theData.getData();
String dataValueString = JsonUtil.serialize(dataValue, false);
String chunkId = myJobPersistence.storeWorkChunk(jobDefinitionId, jobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
sendWorkChannelMessage(jobDefinitionId, jobDefinitionVersion, instanceId, targetStepId, chunkId);
}
@Override
public int getWorkChunkCount() {
return myChunkCounter.get();
}
}
private class FinalStepDataSink extends BaseDataSink<VoidModel> {
private final String myJobDefinitionId;
/**
* Constructor
*/
private FinalStepDataSink(String theJobDefinitionId, String theInstanceId, String theCurrentStepId) {
super(theInstanceId, theCurrentStepId);
myJobDefinitionId = theJobDefinitionId;
}
@Override
public void accept(WorkChunkData<VoidModel> theData) {
String msg = "Illegal attempt to store data during final step of job " + myJobDefinitionId;
ourLog.error(msg);
throw new JobExecutionFailedException(Msg.code(2045) + msg);
}
@Override
public int getWorkChunkCount() {
return 0;
}
}
}

View File

@ -0,0 +1,51 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import java.util.concurrent.atomic.AtomicInteger;
class JobDataSink<OT extends IModelJson> extends BaseDataSink<OT> {
private final BatchJobSender myBatchJobSender;
private final IJobPersistence myJobPersistence;
private final String myJobDefinitionId;
private final int myJobDefinitionVersion;
private final JobDefinitionStep<?, ?, ?> myTargetStep;
private final AtomicInteger myChunkCounter = new AtomicInteger(0);
JobDataSink(BatchJobSender theBatchJobSender, IJobPersistence theJobPersistence, String theJobDefinitionId, int theJobDefinitionVersion, JobDefinitionStep<?, ?, ?> theTargetStep, String theInstanceId, String theCurrentStepId) {
super(theInstanceId, theCurrentStepId);
myBatchJobSender = theBatchJobSender;
myJobPersistence = theJobPersistence;
myJobDefinitionId = theJobDefinitionId;
myJobDefinitionVersion = theJobDefinitionVersion;
myTargetStep = theTargetStep;
}
@Override
public void accept(WorkChunkData<OT> theData) {
String jobDefinitionId = myJobDefinitionId;
int jobDefinitionVersion = myJobDefinitionVersion;
String instanceId = getInstanceId();
String targetStepId = myTargetStep.getStepId();
int sequence = myChunkCounter.getAndIncrement();
OT dataValue = theData.getData();
String dataValueString = JsonUtil.serialize(dataValue, false);
BatchWorkChunk batchWorkChunk = new BatchWorkChunk(jobDefinitionId, jobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk);
JobWorkNotification workNotification = new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, instanceId, targetStepId, chunkId);
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
@Override
public int getWorkChunkCount() {
return myChunkCounter.get();
}
}

View File

@ -24,7 +24,6 @@ import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@ -40,8 +39,8 @@ public class SynchronizedJobPersistenceWrapper implements IJobPersistence {
}
@Override
public synchronized String storeWorkChunk(String theJobDefinitionId, int theJobDefinitionVersion, String theTargetStepId, String theInstanceId, int theSequence, String theDataSerialized) {
return myWrap.storeWorkChunk(theJobDefinitionId, theJobDefinitionVersion, theTargetStepId, theInstanceId, theSequence, theDataSerialized);
public synchronized String storeWorkChunk(BatchWorkChunk theBatchWorkChunk) {
return myWrap.storeWorkChunk(theBatchWorkChunk);
}
@Override

View File

@ -23,6 +23,8 @@ package ca.uhn.fhir.batch2.model;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nonnull;
public class JobWorkNotification implements IModelJson {
@JsonProperty(value = "jobDefinitionId")
@ -40,6 +42,17 @@ public class JobWorkNotification implements IModelJson {
@JsonProperty(value = "instanceId")
private String myInstanceId;
public JobWorkNotification() {
}
public JobWorkNotification(@Nonnull String theJobDefinitionId, int jobDefinitionVersion, @Nonnull String theInstanceId, @Nonnull String theTargetStepId, @Nonnull String theChunkId) {
setJobDefinitionId(theJobDefinitionId);
setJobDefinitionVersion(jobDefinitionVersion);
setChunkId(theChunkId);
setInstanceId(theInstanceId);
setTargetStepId(theTargetStepId);
}
public String getJobDefinitionId() {
return myJobDefinitionId;
}
@ -79,4 +92,9 @@ public class JobWorkNotification implements IModelJson {
public String getInstanceId() {
return myInstanceId;
}
@Override
public String toString() {
return String.format("job[%s] instance[%s] step[%s] chunk[%s]", myJobDefinitionId, myInstanceId, myTargetStepId, myChunkId);
}
}

View File

@ -14,7 +14,6 @@ import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.model.api.IModelJson;
@ -43,7 +42,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@ -70,7 +68,7 @@ public class JobCoordinatorImplTest {
private final IChannelReceiver myWorkChannelReceiver = LinkedBlockingChannel.newSynchronous("receiver");
private JobCoordinatorImpl mySvc;
@Mock
private IChannelProducer myWorkChannelProducer;
private BatchJobSender myBatchJobSender;
@Mock
private IJobPersistence myJobInstancePersister;
@Mock
@ -88,7 +86,7 @@ public class JobCoordinatorImplTest {
@Captor
private ArgumentCaptor<StepExecutionDetails<TestJobParameters, TestJobStep3InputType>> myStep3ExecutionDetailsCaptor;
@Captor
private ArgumentCaptor<JobWorkNotificationJsonMessage> myMessageCaptor;
private ArgumentCaptor<JobWorkNotification> myJobWorkNotificationCaptor;
@Captor
private ArgumentCaptor<JobInstance> myJobInstanceCaptor;
@Captor
@ -96,7 +94,7 @@ public class JobCoordinatorImplTest {
@BeforeEach
public void beforeEach() {
mySvc = new JobCoordinatorImpl(myWorkChannelProducer, myWorkChannelReceiver, myJobInstancePersister, myJobDefinitionRegistry);
mySvc = new JobCoordinatorImpl(myBatchJobSender, myWorkChannelReceiver, myJobInstancePersister, myJobDefinitionRegistry);
}
@Test
@ -427,13 +425,14 @@ public class JobCoordinatorImplTest {
assertEquals(PASSWORD_VALUE, myJobInstanceCaptor.getValue().getParameters(TestJobParameters.class).getPassword());
assertEquals(StatusEnum.QUEUED, myJobInstanceCaptor.getValue().getStatus());
verify(myWorkChannelProducer, times(1)).send(myMessageCaptor.capture());
assertNull(myMessageCaptor.getAllValues().get(0).getPayload().getChunkId());
assertEquals(JOB_DEFINITION_ID, myMessageCaptor.getAllValues().get(0).getPayload().getJobDefinitionId());
assertEquals(1, myMessageCaptor.getAllValues().get(0).getPayload().getJobDefinitionVersion());
assertEquals(STEP_1, myMessageCaptor.getAllValues().get(0).getPayload().getTargetStepId());
verify(myBatchJobSender, times(1)).sendWorkChannelMessage(myJobWorkNotificationCaptor.capture());
assertNull(myJobWorkNotificationCaptor.getAllValues().get(0).getChunkId());
assertEquals(JOB_DEFINITION_ID, myJobWorkNotificationCaptor.getAllValues().get(0).getJobDefinitionId());
assertEquals(1, myJobWorkNotificationCaptor.getAllValues().get(0).getJobDefinitionVersion());
assertEquals(STEP_1, myJobWorkNotificationCaptor.getAllValues().get(0).getTargetStepId());
verify(myJobInstancePersister, times(1)).storeWorkChunk(eq(JOB_DEFINITION_ID), eq(1), eq(STEP_1), eq(INSTANCE_ID), eq(0), isNull());
BatchWorkChunk expectedWorkChunk = new BatchWorkChunk(JOB_DEFINITION_ID, 1, STEP_1, INSTANCE_ID, 0, null);
verify(myJobInstancePersister, times(1)).storeWorkChunk(eq(expectedWorkChunk));
verifyNoMoreInteractions(myJobInstancePersister);
verifyNoMoreInteractions(myStep1Worker);

View File

@ -0,0 +1,139 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class JobDataSinkTest {
private static final String JOB_DEF_ID = "Jeff";
private static final String JOB_DESC = "Jeff is curious";
private static final int JOB_DEF_VERSION = 1;
private static final int PID_COUNT = 729;
private static final String JOB_INSTANCE_ID = "17";
private static final String CHUNK_ID = "289";
public static final String FIRST_STEP_ID = "firstStep";
public static final String LAST_STEP_ID = "lastStep";
@Mock
private BatchJobSender myBatchJobSender;
@Mock
private IJobPersistence myJobPersistence;
@Captor
private ArgumentCaptor<JobWorkNotification> myJobWorkNotificationCaptor;
@Captor
private ArgumentCaptor<BatchWorkChunk> myBatchWorkChunkCaptor;
@Test
public void test_sink_accept() {
// setup
IJobStepWorker<TestJobParameters, VoidModel, Step1Output> firstStepWorker = new IJobStepWorker<TestJobParameters, VoidModel, Step1Output>() {
@NotNull
@Override
public RunOutcome run(@NotNull StepExecutionDetails<TestJobParameters, VoidModel> theStepExecutionDetails, @NotNull IJobDataSink<Step1Output> theDataSink) throws JobExecutionFailedException {
TestJobParameters params = theStepExecutionDetails.getParameters();
int numPidsToGenerate = Integer.parseInt(params.getParam1());
Step1Output output = new Step1Output();
for (long i = 0; i < numPidsToGenerate; ++i) {
output.addPid(i);
}
theDataSink.accept(output);
return new RunOutcome(numPidsToGenerate);
}
};
IJobStepWorker<TestJobParameters, Step1Output, VoidModel> lastStepWorker = (details, sink) -> {
// Our test does not call this worker
fail();
return null;
};
JobDefinition<TestJobParameters> job = JobDefinition.newBuilder()
.setJobDefinitionId(JOB_DEF_ID)
.setJobDescription(JOB_DESC)
.setJobDefinitionVersion(JOB_DEF_VERSION)
.setParametersType(TestJobParameters.class)
.addFirstStep(FIRST_STEP_ID, "s1desc", Step1Output.class, firstStepWorker)
.addLastStep(LAST_STEP_ID, "s2desc", lastStepWorker)
.build();
// execute
// Let's test our first step worker by calling run on it:
when(myJobPersistence.storeWorkChunk(myBatchWorkChunkCaptor.capture())).thenReturn(CHUNK_ID);
StepExecutionDetails<TestJobParameters, VoidModel> details = new StepExecutionDetails<>(new TestJobParameters().setParam1("" + PID_COUNT), null, JOB_INSTANCE_ID, CHUNK_ID);
JobDataSink<Step1Output> sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, JOB_DEF_ID, JOB_DEF_VERSION, job.getSteps().get(1), JOB_INSTANCE_ID, FIRST_STEP_ID);
RunOutcome result = firstStepWorker.run(details, sink);
// verify
assertEquals(PID_COUNT, result.getRecordsProcessed());
// theDataSink.accept(output) called by firstStepWorker above calls two services. Let's validate them both.
verify(myBatchJobSender).sendWorkChannelMessage(myJobWorkNotificationCaptor.capture());
JobWorkNotification notification = myJobWorkNotificationCaptor.getValue();
assertEquals(JOB_DEF_ID, notification.getJobDefinitionId());
assertEquals(JOB_INSTANCE_ID, notification.getInstanceId());
assertEquals(CHUNK_ID, notification.getChunkId());
assertEquals(JOB_DEF_VERSION, notification.getJobDefinitionVersion());
assertEquals(LAST_STEP_ID, notification.getTargetStepId());
BatchWorkChunk batchWorkChunk = myBatchWorkChunkCaptor.getValue();
assertEquals(JOB_DEF_VERSION, batchWorkChunk.jobDefinitionVersion);
assertEquals(0, batchWorkChunk.sequence);
assertEquals(JOB_DEF_ID, batchWorkChunk.jobDefinitionId);
assertEquals(JOB_INSTANCE_ID, batchWorkChunk.instanceId);
assertEquals(LAST_STEP_ID, batchWorkChunk.targetStepId);
Step1Output stepOutput = JsonUtil.deserialize(batchWorkChunk.serializedData, Step1Output.class);
assertThat(stepOutput.getPids(), hasSize(PID_COUNT));
}
private static class Step1Output implements IModelJson {
@JsonProperty("pids")
private List<Long> myPids;
public List<Long> getPids() {
if (myPids == null) {
myPids = new ArrayList<>();
}
return myPids;
}
public Step1Output setPids(List<Long> thePids) {
myPids = thePids;
return this;
}
public void addPid(long thePid) {
getPids().add(thePid);
}
}
}

View File

@ -0,0 +1,38 @@
package ca.uhn.fhir.batch2.impl;
import ca.uhn.fhir.batch2.model.JobDefinition;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
class JobDefinitionTest {
private static final String JOB_DEF_ID = "Jeff";
private static final String JOB_DESC = "Jeff is curious";
@Test
public void emptyBuilder_fails() {
try {
JobDefinition.newBuilder().build();
fail();
} catch (NullPointerException e) {
assertEquals("No job parameters type was supplied", e.getMessage());
}
}
@Test
public void builder_no_steps() {
try {
JobDefinition.newBuilder()
.setJobDefinitionId(JOB_DEF_ID)
.setJobDescription(JOB_DESC)
.setJobDefinitionVersion(1)
.setParametersType(TestJobParameters.class)
.build();
fail();
} catch (IllegalArgumentException e) {
assertEquals("At least 2 steps must be supplied", e.getMessage());
}
}
}