batch subscription processing

This commit is contained in:
leif stawnyczy 2024-07-05 11:55:32 -04:00
parent 371aad2b08
commit 07e90d9f4e
10 changed files with 130 additions and 32 deletions

View File

@ -0,0 +1,7 @@
---
type: fix
issue: 6074
title: "Before being processed, subscriptions would be read out of the database all
at once. This lead to massive memory consumption if there were a lot of them.
This has now been changed to use batching as a means of mitigating this problem.
"

View File

@ -23,18 +23,19 @@ package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK;
import ca.uhn.fhir.jpa.model.entity.ResourceModifiedEntity;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.List;
public interface IResourceModifiedDao
extends JpaRepository<ResourceModifiedEntity, PersistedResourceModifiedMessageEntityPK>,
IHapiFhirJpaRepository {
@Query("SELECT r FROM ResourceModifiedEntity r ORDER BY r.myCreatedTime ASC")
List<IPersistedResourceModifiedMessage> findAllOrderedByCreatedTime();
Page<IPersistedResourceModifiedMessage> findAllOrderedByCreatedTime(Pageable thePage);
@Modifying
@Query("delete from ResourceModifiedEntity r where r.myResourceModifiedEntityPK =:pk")

View File

@ -43,6 +43,8 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import java.util.Date;
import java.util.List;
@ -82,8 +84,10 @@ public class ResourceModifiedMessagePersistenceSvcImpl implements IResourceModif
}
@Override
public List<IPersistedResourceModifiedMessage> findAllOrderedByCreatedTime() {
return myHapiTransactionService.withSystemRequest().execute(myResourceModifiedDao::findAllOrderedByCreatedTime);
public Page<IPersistedResourceModifiedMessage> findAllOrderedByCreatedTime(Pageable thePageable) {
return myHapiTransactionService.withSystemRequest().execute(() -> {
return myResourceModifiedDao.findAllOrderedByCreatedTime(thePageable);
});
}
@Override

View File

@ -19,9 +19,13 @@
*/
package ca.uhn.fhir.jpa.model.entity;
import java.util.Date;
public interface IPersistedResourceModifiedMessage {
IPersistedResourceModifiedMessagePK getPersistedResourceModifiedMessagePk();
String getResourceType();
Date getCreatedTime();
}

View File

@ -76,6 +76,7 @@ public class ResourceModifiedEntity implements IPersistedResourceModifiedMessage
return this;
}
@Override
public Date getCreatedTime() {
return myCreatedTime;
}

View File

@ -23,10 +23,11 @@ package ca.uhn.fhir.jpa.subscription.async;
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
import ca.uhn.fhir.subscription.api.IResourceModifiedConsumerWithRetries;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
/**
* The purpose of this service is to submit messages to the processing pipeline for which previous attempts at
@ -36,6 +37,8 @@ import java.util.List;
public class AsyncResourceModifiedSubmitterSvc {
private static final Logger ourLog = LoggerFactory.getLogger(AsyncResourceModifiedSubmitterSvc.class);
public static final int MAX_LIMIT = 1000;
private final IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
private final IResourceModifiedConsumerWithRetries myResourceModifiedConsumer;
@ -47,21 +50,32 @@ public class AsyncResourceModifiedSubmitterSvc {
}
public void runDeliveryPass() {
boolean hasMoreToFetch = false;
int limit = getLimit();
do {
// we always take the 0th page, because we're deleting the elements as we process them
Page<IPersistedResourceModifiedMessage> persistedResourceModifiedMsgsPage = myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime(
PageRequest.of(0, limit));
ourLog.debug(
"Attempting to submit {} resources to consumer channel.", persistedResourceModifiedMsgsPage.getTotalElements());
List<IPersistedResourceModifiedMessage> allPersistedResourceModifiedMessages =
myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime();
ourLog.debug(
"Attempting to submit {} resources to consumer channel.", allPersistedResourceModifiedMessages.size());
hasMoreToFetch = persistedResourceModifiedMsgsPage.hasNext();
for (IPersistedResourceModifiedMessage persistedResourceModifiedMessage :
allPersistedResourceModifiedMessages) {
boolean wasProcessed =
for (IPersistedResourceModifiedMessage persistedResourceModifiedMessage : persistedResourceModifiedMsgsPage) {
boolean wasProcessed =
myResourceModifiedConsumer.submitPersisedResourceModifiedMessage(persistedResourceModifiedMessage);
if (!wasProcessed) {
break;
if (!wasProcessed) {
// we're not fetching anymore no matter what
hasMoreToFetch = false;
break;
}
}
}
} while (hasMoreToFetch);
}
@VisibleForTesting
public static int getLimit() {
return MAX_LIMIT;
}
}

View File

@ -143,6 +143,7 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer,
boolean processed = true;
ResourceModifiedMessage resourceModifiedMessage = null;
// TODO - batch these
try {
// delete the entry to lock the row to ensure unique processing
boolean wasDeleted = deletePersistedResourceModifiedMessage(

View File

@ -1,11 +1,11 @@
package ca.uhn.fhir.jpa.subscription.async;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SynchronousSubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK;
import ca.uhn.fhir.jpa.model.entity.ResourceModifiedEntity;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
@ -14,25 +14,39 @@ import ca.uhn.fhir.jpa.subscription.message.TestQueueConsumerHandler;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SynchronousSubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.domain.Pageable;
import org.springframework.test.context.ContextConfiguration;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ContextConfiguration(classes = {AsyncSubscriptionMessageSubmissionIT.SpringConfig.class})
public class AsyncSubscriptionMessageSubmissionIT extends BaseSubscriptionsR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(AsyncSubscriptionMessageSubmissionIT.class);
@SpyBean
IResourceModifiedConsumer myResourceModifiedConsumer;
@ -49,6 +63,9 @@ public class AsyncSubscriptionMessageSubmissionIT extends BaseSubscriptionsR4Tes
StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber;
private TestQueueConsumerHandler<ResourceModifiedJsonMessage> myQueueConsumerHandler;
@Autowired
private IResourceModifiedDao myResourceModifiedDao;
@AfterEach
public void cleanupStoppableSubscriptionDeliveringRestHookSubscriber() {
myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(null);
@ -73,6 +90,46 @@ public class AsyncSubscriptionMessageSubmissionIT extends BaseSubscriptionsR4Tes
assertFalse(mySubscriptionMatcherInterceptor instanceof SynchronousSubscriptionMatcherInterceptor);
}
@Test
public void runDeliveryPass_withManyResources_isBatchedAndKeepsResourceUsageDown() throws JsonProcessingException, InterruptedException {
// setup
myLogbackTestExtension.setUp(Level.DEBUG);
String resourceType = "Patient";
int factor = 5;
int numberOfResourcesToCreate = factor * AsyncResourceModifiedSubmitterSvc.MAX_LIMIT;
ResourceModifiedEntity entity = new ResourceModifiedEntity();
entity.setResourceType(resourceType);
PersistedResourceModifiedMessageEntityPK rpm = new PersistedResourceModifiedMessageEntityPK();
rpm.setResourceVersion("1");
entity.setResourceModifiedEntityPK(rpm);
// we reuse the same exact msg content to avoid
// the slowdown of serializing it over and over
SystemRequestDetails details = new SystemRequestDetails();
// create a large number of resources
for (int i = 0; i < numberOfResourcesToCreate; i++) {
Patient resource = new Patient();
resource.setId(resourceType + "/" + (1 + i));
myPatientDao.create(resource, details);
}
assertEquals(numberOfResourcesToCreate, myResourceModifiedDao.count());
// test
myAsyncResourceModifiedSubmitterSvc.runDeliveryPass();
// verification
waitForQueueToDrain();
assertCountOfResourcesNeedingSubmission(0);
List<ILoggingEvent> events = myLogbackTestExtension.filterLoggingEventsWithPredicate(e -> {
return e.getLevel() == Level.DEBUG && e.getFormattedMessage().contains("Attempting to submit");
});
assertEquals(factor, events.size());
}
@Test
// the purpose of this test is to assert that a resource matching a given subscription is
// delivered asynchronously to the subscription processing pipeline.
@ -107,7 +164,9 @@ public class AsyncSubscriptionMessageSubmissionIT extends BaseSubscriptionsR4Tes
}
private void assertCountOfResourcesNeedingSubmission(int theExpectedCount) {
assertThat(myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime()).hasSize(theExpectedCount);
assertThat(myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime(
Pageable.unpaged()))
.hasSize(theExpectedCount);
}
private Subscription createAndSubmitSubscriptionWithCriteria(String theCriteria) {

View File

@ -1,9 +1,9 @@
package ca.uhn.fhir.jpa.subscription.message;
import static org.junit.jupiter.api.Assertions.assertEquals;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK;
import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK;
@ -13,7 +13,6 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.rest.client.api.Header;
import ca.uhn.fhir.rest.client.api.IGenericClient;
@ -37,9 +36,12 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -48,6 +50,7 @@ import static ca.uhn.fhir.jpa.model.util.JpaConstants.HEADER_META_SNAPSHOT_MODE;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -196,6 +199,7 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
@Test
public void testMethodFindAllOrdered_willReturnAllPersistedResourceModifiedMessagesOrderedByCreatedTime(){
Date now = new Date();
mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
// given
@ -209,11 +213,12 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
IPersistedResourceModifiedMessage organizationPersistedMessage = myResourceModifiedMessagePersistenceSvc.persist(organizationResourceModifiedMessage);
// when
List<IPersistedResourceModifiedMessage> allPersisted = myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime();
Page<IPersistedResourceModifiedMessage> allPersisted = myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime(
Pageable.unpaged()
);
// then
assertOnPksAndOrder(allPersisted, List.of(patientPersistedMessage, organizationPersistedMessage));
assertOnPksAndOrder(allPersisted.stream().toList(), List.of(patientPersistedMessage, organizationPersistedMessage));
}
@Test
@ -232,7 +237,8 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
// then
assertTrue(wasDeleted);
assertThat(myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime()).hasSize(0);
assertThat(myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime(Pageable.unpaged()))
.hasSize(0);
}
@Test

View File

@ -23,8 +23,9 @@ package ca.uhn.fhir.subscription.api;
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import java.util.List;
import java.util.Optional;
/**
@ -38,10 +39,10 @@ public interface IResourceModifiedMessagePersistenceSvc {
/**
* Find all persistedResourceModifiedMessage sorted by ascending created dates (oldest to newest).
*
* @param thePageable Page request
* @return A sorted list of persistedResourceModifiedMessage needing submission.
*/
List<IPersistedResourceModifiedMessage> findAllOrderedByCreatedTime();
Page<IPersistedResourceModifiedMessage> findAllOrderedByCreatedTime(Pageable thePageable);
/**
* Delete a persistedResourceModifiedMessage by its primary key.