Fix intermittent in batch2 tests (#4643)

* Fix intermittent in batch2 tests

* Add changelog

* Test fix

* Resolve intermittent

* Reducer changes

* Test fix

* Test fixes

* Resolve fixme

* Fix changelog
This commit is contained in:
James Agnew 2023-03-13 15:59:05 -04:00 committed by GitHub
parent 5f74396bfb
commit 59123cf0f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 147 additions and 90 deletions

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 4643
title: "There was a transaction boundary issue in the Batch2 storage layer which resulted in the
framework needing more open database connections than necessary. This has been corrected."

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.batch2.config.BaseBatch2Config;
import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.system.HapiSystemProperties;
import ca.uhn.fhir.util.ProxyUtil;
import org.springframework.context.annotation.Bean;
@ -42,14 +43,14 @@ import javax.persistence.EntityManager;
public class JpaBatch2Config extends BaseBatch2Config {
@Bean
public IJobPersistence batch2JobInstancePersister(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, PlatformTransactionManager theTransactionManager, EntityManager theEntityManager) {
return new JpaJobPersistenceImpl(theJobInstanceRepository, theWorkChunkRepository, theTransactionManager, theEntityManager);
public IJobPersistence batch2JobInstancePersister(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, IHapiTransactionService theTransactionService, EntityManager theEntityManager) {
return new JpaJobPersistenceImpl(theJobInstanceRepository, theWorkChunkRepository, theTransactionService, theEntityManager);
}
@Primary
@Bean
public IJobPersistence batch2JobInstancePersisterWrapper(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, PlatformTransactionManager theTransactionManager, EntityManager theEntityManager) {
IJobPersistence retVal = batch2JobInstancePersister(theJobInstanceRepository, theWorkChunkRepository, theTransactionManager, theEntityManager);
public IJobPersistence batch2JobInstancePersisterWrapper(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, IHapiTransactionService theTransactionService, EntityManager theEntityManager) {
IJobPersistence retVal = batch2JobInstancePersister(theJobInstanceRepository, theWorkChunkRepository, theTransactionService, theEntityManager);
// Avoid H2 synchronization issues caused by
// https://github.com/h2database/h2database/issues/1808
if (HapiSystemProperties.isUnitTestModeEnabled()) {

View File

@ -33,6 +33,7 @@ import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
@ -45,12 +46,9 @@ import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -76,22 +74,19 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
private final IBatch2JobInstanceRepository myJobInstanceRepository;
private final IBatch2WorkChunkRepository myWorkChunkRepository;
private final TransactionTemplate myTxTemplate;
private final EntityManager myEntityManager;
private final IHapiTransactionService myTransactionService;
/**
* Constructor
*/
public JpaJobPersistenceImpl(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, PlatformTransactionManager theTransactionManager, EntityManager theEntityManager) {
public JpaJobPersistenceImpl(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, IHapiTransactionService theTransactionService, EntityManager theEntityManager) {
Validate.notNull(theJobInstanceRepository);
Validate.notNull(theWorkChunkRepository);
myJobInstanceRepository = theJobInstanceRepository;
myWorkChunkRepository = theWorkChunkRepository;
myTransactionService = theTransactionService;
myEntityManager = theEntityManager;
// TODO: JA replace with HapiTransactionManager in megascale ticket
myTxTemplate = new TransactionTemplate(theTransactionManager);
myTxTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
}
@Override
@ -186,9 +181,10 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
@Nonnull
@Transactional(propagation = Propagation.REQUIRES_NEW)
public Optional<JobInstance> fetchInstance(String theInstanceId) {
return myJobInstanceRepository.findById(theInstanceId).map(this::toInstance);
return myTransactionService
.withSystemRequest()
.execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance));
}
@Override
@ -332,7 +328,10 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
private void fetchChunks(String theInstanceId, boolean theIncludeData, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer) {
myTxTemplate.executeWithoutResult(tx -> {
myTransactionService
.withSystemRequest()
.withPropagation(Propagation.REQUIRES_NEW)
.execute(() -> {
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunks(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
for (Batch2WorkChunkEntity chunk : chunks) {
theConsumer.accept(toChunk(chunk, theIncludeData));
@ -342,7 +341,10 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
public List<String> fetchAllChunkIdsForStepWithStatus(String theInstanceId, String theStepId, WorkChunkStatusEnum theStatusEnum) {
return myTxTemplate.execute(tx -> myWorkChunkRepository.fetchAllChunkIdsForStepWithStatus(theInstanceId, theStepId, theStatusEnum));
return myTransactionService
.withSystemRequest()
.withPropagation(Propagation.REQUIRES_NEW)
.execute(() -> myWorkChunkRepository.fetchAllChunkIdsForStepWithStatus(theInstanceId, theStepId, theStatusEnum));
}
@Override
@ -426,7 +428,9 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
public boolean markInstanceAsStatus(String theInstance, StatusEnum theStatusEnum) {
int recordsChanged = myJobInstanceRepository.updateInstanceStatus(theInstance, theStatusEnum);
int recordsChanged = myTransactionService
.withSystemRequest()
.execute(()->myJobInstanceRepository.updateInstanceStatus(theInstance, theStatusEnum));
return recordsChanged > 0;
}
@ -451,6 +455,9 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
public void processCancelRequests() {
myTransactionService
.withSystemRequest()
.execute(()->{
Query query = myEntityManager.createQuery(
"UPDATE Batch2JobInstanceEntity b " +
"set myStatus = ca.uhn.fhir.batch2.model.StatusEnum.CANCELLED " +
@ -458,6 +465,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
"AND myStatus IN (:states)");
query.setParameter("states", StatusEnum.CANCELLED.getPriorStates());
query.executeUpdate();
});
}
}

View File

@ -7,6 +7,8 @@ import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
@ -16,6 +18,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
@ -48,8 +51,8 @@ class JpaJobPersistenceImplTest {
IBatch2JobInstanceRepository myJobInstanceRepository;
@Mock
IBatch2WorkChunkRepository myWorkChunkRepository;
@Mock
PlatformTransactionManager myTxManager;
@Spy
IHapiTransactionService myTxManager = new NonTransactionalHapiTransactionService();
@InjectMocks
JpaJobPersistenceImpl mySvc;

View File

@ -8,6 +8,7 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.delete.job.ReindexTestHelper;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamToken;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
@ -27,10 +28,13 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.model.util.JpaConstants.DEFAULT_PARTITION_NAME;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.isA;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -156,11 +160,22 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
logAllTokenIndexes();
// validate
runInTransaction(()->{
long indexedSps = myResourceIndexedSearchParamTokenDao
.findAll()
.stream()
.filter(t->t.getParamName().equals("alleleName"))
.count();
assertEquals(1, indexedSps, ()->"Token indexes:\n * " + myResourceIndexedSearchParamTokenDao.findAll().stream().filter(t->t.getParamName().equals("alleleName")).map(ResourceIndexedSearchParamToken::toString).collect(Collectors.joining("\n * ")));
});
List<String> alleleObservationIds = reindexTestHelper.getAlleleObservationIds(myClient);
// Only the one in the first tenant should be indexed
myTenantClientInterceptor.setTenantId(TENANT_A);
MatcherAssert.assertThat(reindexTestHelper.getAlleleObservationIds(myClient), hasSize(1));
await().until(() -> reindexTestHelper.getAlleleObservationIds(myClient), hasSize(1));
assertEquals(obsFinalA.getIdPart(), alleleObservationIds.get(0));
myTenantClientInterceptor.setTenantId(TENANT_B);
MatcherAssert.assertThat(reindexTestHelper.getAlleleObservationIds(myClient), hasSize(0));
@ -180,9 +195,17 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
runInTransaction(()->{
long indexedSps = myResourceIndexedSearchParamTokenDao
.findAll()
.stream()
.filter(t->t.getParamName().equals("alleleName"))
.count();
assertEquals(3, indexedSps, ()->"Token indexes:\n * " + myResourceIndexedSearchParamTokenDao.findAll().stream().filter(t->t.getParamName().equals("alleleName")).map(ResourceIndexedSearchParamToken::toString).collect(Collectors.joining("\n * ")));
});
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
MatcherAssert.assertThat(reindexTestHelper.getAlleleObservationIds(myClient), hasSize(1));
await().until(() -> reindexTestHelper.getAlleleObservationIds(myClient), hasSize(1));
}
@Test

View File

@ -95,7 +95,6 @@ public interface IJobPersistence extends IWorkChunkPersistence {
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId);
/**
@ -154,7 +153,6 @@ public interface IJobPersistence extends IWorkChunkPersistence {
*/
boolean markInstanceAsCompleted(String theInstanceId);
@Transactional(propagation = Propagation.REQUIRES_NEW)
boolean markInstanceAsStatus(String theInstance, StatusEnum theStatusEnum);
/**
@ -166,7 +164,6 @@ public interface IJobPersistence extends IWorkChunkPersistence {
void updateInstanceUpdateTime(String theInstanceId);
@Transactional
void processCancelRequests();
}

View File

@ -5,7 +5,6 @@ import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import org.springframework.transaction.annotation.Transactional;
import java.util.Iterator;
import java.util.List;
@ -15,6 +14,7 @@ import java.util.stream.Stream;
/**
* Work Chunk api, implementing the WorkChunk state machine.
* Test specification is in {@link ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest}
*
* @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
*/
public interface IWorkChunkPersistence {
@ -34,7 +34,6 @@ public interface IWorkChunkPersistence {
* @param theBatchWorkChunk the batch work chunk to be stored
* @return a globally unique identifier for this chunk.
*/
@Transactional
String storeWorkChunk(BatchWorkChunk theBatchWorkChunk);
/**
@ -45,18 +44,16 @@ public interface IWorkChunkPersistence {
* @param theChunkId The ID from {@link #storeWorkChunk(BatchWorkChunk theBatchWorkChunk)}
* @return The WorkChunk or empty if no chunk with that id exists in the QUEUED or ERRORRED states
*/
@Transactional
Optional<WorkChunk> fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId);
/**
* Marks a given chunk as having errored (ie, may be recoverable)
*
* <p>
* Returns the work chunk.
*
* @param theParameters - the parameters for marking the workchunk with error
* @return - workchunk optional, if available.
*/
@Transactional
WorkChunkStatusEnum workChunkErrorEvent(WorkChunkErrorEvent theParameters);
/**
@ -64,25 +61,24 @@ public interface IWorkChunkPersistence {
*
* @param theChunkId The chunk ID
*/
@Transactional
void markWorkChunkAsFailed(String theChunkId, String theErrorMessage);
/**
* Report success and complete the chunk.
*
* @param theEvent with record and error count
*/
@Transactional
void workChunkCompletionEvent(WorkChunkCompletionEvent theEvent);
/**
* Marks all work chunks with the provided status and erases the data
*
* @param theInstanceId - the instance id
* @param theChunkIds - the ids of work chunks being reduced to single chunk
* @param theStatus - the status to mark
* @param theErrorMsg - error message (if status warrants it)
*/
@Transactional
void markWorkChunksWithStatusAndWipeData(String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMsg);
@ -96,9 +92,9 @@ public interface IWorkChunkPersistence {
List<WorkChunk> fetchWorkChunksWithoutData(String theInstanceId, int thePageSize, int thePageIndex);
/**
* Fetch all chunks for a given instance.
*
* @param theInstanceId - instance id
* @param theWithData - whether or not to include the data
* @return - an iterator for fetching work chunks
@ -106,9 +102,9 @@ public interface IWorkChunkPersistence {
Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData);
/**
* Fetch all chunks with data for a given instance for a given step id
*
* @return - a stream for fetching work chunks
*/
Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId);
@ -123,5 +119,4 @@ public interface IWorkChunkPersistence {
List<String> fetchAllChunkIdsForStepWithStatus(String theInstanceId, String theStepId, WorkChunkStatusEnum theStatusEnum);
}

View File

@ -73,8 +73,8 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
private final IHapiTransactionService myTransactionService;
private final Semaphore myCurrentlyExecuting = new Semaphore(1);
private final AtomicReference<String> myCurrentlyFinalizingInstanceId = new AtomicReference<>();
private Timer myHeartbeatTimer;
private final JobDefinitionRegistry myJobDefinitionRegistry;
private Timer myHeartbeatTimer;
/**
@ -91,15 +91,17 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
@EventListener(ContextRefreshedEvent.class)
public void start() {
if (myHeartbeatTimer == null) {
myHeartbeatTimer = new Timer("batch2-reducer-heartbeat");
myHeartbeatTimer.schedule(new HeartbeatTimerTask(), DateUtils.MILLIS_PER_MINUTE, DateUtils.MILLIS_PER_MINUTE);
}
}
private void runHeartbeat() {
String currentlyFinalizingInstanceId = myCurrentlyFinalizingInstanceId.get();
if (currentlyFinalizingInstanceId != null) {
ourLog.info("Running heartbeat for instance: {}", currentlyFinalizingInstanceId);
executeInTransactionWithSynchronization(()->{
executeInTransactionWithSynchronization(() -> {
myJobPersistence.updateInstanceUpdateTime(currentlyFinalizingInstanceId);
return null;
});
@ -108,7 +110,10 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
@EventListener(ContextClosedEvent.class)
public void shutdown() {
if (myHeartbeatTimer != null) {
myHeartbeatTimer.cancel();
myHeartbeatTimer = null;
}
}
@ -136,6 +141,8 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
myInstanceIdToJobWorkCursor.remove(instanceId);
}
} catch (Exception e) {
ourLog.error("Failed to execute reducer pass", e);
} finally {
myCurrentlyFinalizingInstanceId.set(null);
myCurrentlyExecuting.release();
@ -148,14 +155,15 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
JobDefinitionStep<PT, IT, OT> step = theJobWorkCursor.getCurrentStep();
JobInstance instance = executeInTransactionWithSynchronization(() -> {
JobInstance currentInstance = myJobPersistence.fetchInstance(theInstanceId).orElseThrow(() -> new InternalErrorException("Unknown currentInstance: " + theInstanceId));
JobInstance instance = executeInTransactionWithSynchronization(() ->
myJobPersistence.fetchInstance(theInstanceId).orElseThrow(() -> new InternalErrorException("Unknown instance: " + theInstanceId)));
boolean shouldProceed = false;
switch (currentInstance.getStatus()) {
switch (instance.getStatus()) {
case IN_PROGRESS:
case ERRORED:
if (myJobPersistence.markInstanceAsStatus(currentInstance.getInstanceId(), StatusEnum.FINALIZE)) {
ourLog.info("Job instance {} has been set to FINALIZE state - Beginning reducer step", currentInstance.getInstanceId());
if (myJobPersistence.markInstanceAsStatus(instance.getInstanceId(), StatusEnum.FINALIZE)) {
ourLog.info("Job instance {} has been set to FINALIZE state - Beginning reducer step", instance.getInstanceId());
shouldProceed = true;
}
break;
@ -172,15 +180,9 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS
"JobInstance[{}] should not be finalized at this time. In memory status is {}. Reduction step will not rerun!"
+ " This could be a long running reduction job resulting in the processed msg not being acknowledge,"
+ " or the result of a failed process or server restarting.",
currentInstance.getInstanceId(),
currentInstance.getStatus().name()
instance.getInstanceId(),
instance.getStatus().name()
);
return null;
}
return currentInstance;
});
if (instance == null) {
return new ReductionStepChunkProcessingResponse(false);
}

View File

@ -89,6 +89,11 @@ public class HapiTransactionService implements IHapiTransactionService {
return new ExecutionBuilder(theRequestDetails);
}
@Override
public IExecutionBuilder withSystemRequest() {
return new ExecutionBuilder(null);
}
/**
* @deprecated Use {@link #withRequest(RequestDetails)} with fluent call instead

View File

@ -49,12 +49,19 @@ public interface IHapiTransactionService {
*/
IExecutionBuilder withRequest(@Nullable RequestDetails theRequestDetails);
/**
* Fluent builder for internal system requests with no external
* requestdetails associated
*/
IExecutionBuilder withSystemRequest();
/**
* @deprecated It is highly recommended to use {@link #withRequest(RequestDetails)} instead of this method, for increased visibility.
*/
@Deprecated
<T> T withRequest(@Nullable RequestDetails theRequestDetails, @Nullable TransactionDetails theTransactionDetails, @Nonnull Propagation thePropagation, @Nonnull Isolation theIsolation, @Nonnull ICallable<T> theCallback);
interface IExecutionBuilder {
IExecutionBuilder withIsolation(Isolation theIsolation);

View File

@ -34,6 +34,8 @@ import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.listener.RetryListenerSupport;
import org.springframework.retry.policy.TimeoutRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.TransactionException;
import javax.annotation.Nonnull;
@ -67,6 +69,15 @@ class RetryingMessageHandlerWrapper implements MessageHandler {
if (theThrowable instanceof BaseUnrecoverableRuntimeException) {
theContext.setExhaustedOnly();
}
if (theThrowable instanceof CannotCreateTransactionException) {
/*
* This exception means that we can't open a transaction, which
* means the EntityManager is closed. This can happen if we are shutting
* down while there is still a message in the queue - No sense
* retrying indefinitely in that case
*/
theContext.setExhaustedOnly();
}
}
};
retryTemplate.setListeners(new RetryListener[]{retryListener});