This commit is contained in:
nathaniel.doef 2023-02-07 10:33:08 -05:00
parent 34e27c4313
commit 7da52ef537
3 changed files with 24 additions and 25 deletions

View File

@ -39,7 +39,6 @@ import ca.uhn.fhir.util.Logs;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
@ -51,7 +50,6 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
@ -68,9 +66,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
public class JpaJobPersistenceImpl implements IJobPersistence {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Autowired
private EntityManager myEntityManager;
private final IBatch2JobInstanceRepository myJobInstanceRepository;
private final IBatch2WorkChunkRepository myWorkChunkRepository;
private final TransactionTemplate myTxTemplate;
@ -313,6 +308,16 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
return myTxTemplate.execute(tx -> myWorkChunkRepository.fetchAllChunkIdsForStepWithStatus(theInstanceId, theStepId, theStatusEnum));
}
private void fetchChunksForStep(String theInstanceId, String theStepId, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer) {
myTxTemplate.executeWithoutResult(tx -> {
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunksForStep(PageRequest.of(thePageIndex, thePageSize), theInstanceId, theStepId);
for (Batch2WorkChunkEntity chunk : chunks) {
theConsumer.accept(toChunk(chunk, true));
}
});
}
/**
* Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
*/
@ -321,17 +326,14 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
}
/**
* Deprecated, use {@link ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl#fetchAllWorkChunksForStepStream(String, String)}
* Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
*/
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Deprecated
public Iterator<WorkChunk> fetchAllWorkChunksForStepIterator(String theInstanceId, String theStepId) {
List<WorkChunk> workChunks = new ArrayList<>();
try(Stream<Batch2WorkChunkEntity> entities = myWorkChunkRepository.fetchChunksForStep(theInstanceId, theStepId)){
entities.forEach((entity) -> {
workChunks.add(toChunk(entity, true));
myEntityManager.detach(entity);
});
return workChunks.iterator();
}
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunksForStep(theInstanceId, theStepId, theBatchSize, thePageIndex, theConsumer));
}
@Override

View File

@ -40,6 +40,13 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
@Query("SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
List<StatusEnum> getDistinctStatusesForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
/**
* Deprecated, use {@link ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository#fetchChunksForStep(String, String)}
*/
@Deprecated
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :targetStepId ORDER BY e.mySequence ASC")
List<Batch2WorkChunkEntity> fetchChunksForStep(Pageable thePageRequest, @Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :targetStepId ORDER BY e.mySequence ASC")
Stream<Batch2WorkChunkEntity> fetchChunksForStep(@Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);

View File

@ -35,10 +35,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public interface IJobPersistence {
@ -209,14 +206,7 @@ public interface IJobPersistence {
* @param theStepId
* @return - a stream for fetching work chunks
*/
default Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId){
Iterator<WorkChunk> workChunkIterator = fetchAllWorkChunksForStepIterator(theInstanceId, theStepId);
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(workChunkIterator, Spliterator.ORDERED),
false
);
}
Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId);
/**
* Update the stored instance. If the status is changing, use {@link ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater}