From 07e90d9f4e69a7a233956ac842028d45c759ca5f Mon Sep 17 00:00:00 2001 From: leif stawnyczy Date: Fri, 5 Jul 2024 11:55:32 -0400 Subject: [PATCH] batch subscription processing --- ...bscription-processing-will-be-batched.yaml | 7 ++ .../jpa/dao/data/IResourceModifiedDao.java | 7 +- ...urceModifiedMessagePersistenceSvcImpl.java | 8 ++- .../IPersistedResourceModifiedMessage.java | 4 ++ .../model/entity/ResourceModifiedEntity.java | 1 + .../AsyncResourceModifiedSubmitterSvc.java | 40 +++++++---- .../svc/ResourceModifiedSubmitterSvc.java | 1 + .../AsyncSubscriptionMessageSubmissionIT.java | 69 +++++++++++++++++-- .../message/MessageSubscriptionR4Test.java | 18 +++-- ...ResourceModifiedMessagePersistenceSvc.java | 7 +- 10 files changed, 130 insertions(+), 32 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_4_0/6074-subscription-processing-will-be-batched.yaml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_4_0/6074-subscription-processing-will-be-batched.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_4_0/6074-subscription-processing-will-be-batched.yaml new file mode 100644 index 00000000000..dc25e74a46d --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_4_0/6074-subscription-processing-will-be-batched.yaml @@ -0,0 +1,7 @@ +--- +type: fix +issue: 6074 +title: "Before being processed, subscriptions would be read out of the database all + at once. This lead to massive memory consumption if there were a lot of them. + This has now been changed to use batching as a means of mitigating this problem. +" diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IResourceModifiedDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IResourceModifiedDao.java index 6be1c3d5e99..c8d7c8e2193 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IResourceModifiedDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IResourceModifiedDao.java @@ -23,18 +23,19 @@ package ca.uhn.fhir.jpa.dao.data; import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage; import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK; import ca.uhn.fhir.jpa.model.entity.ResourceModifiedEntity; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; -import java.util.List; - public interface IResourceModifiedDao extends JpaRepository, IHapiFhirJpaRepository { + @Query("SELECT r FROM ResourceModifiedEntity r ORDER BY r.myCreatedTime ASC") - List findAllOrderedByCreatedTime(); + Page findAllOrderedByCreatedTime(Pageable thePage); @Modifying @Query("delete from ResourceModifiedEntity r where r.myResourceModifiedEntityPK =:pk") diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessagePersistenceSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessagePersistenceSvcImpl.java index 26793f0b371..4f29193f5e9 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessagePersistenceSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/ResourceModifiedMessagePersistenceSvcImpl.java @@ -43,6 +43,8 @@ import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; import java.util.Date; import java.util.List; @@ -82,8 +84,10 @@ public class ResourceModifiedMessagePersistenceSvcImpl implements IResourceModif } @Override - public List findAllOrderedByCreatedTime() { - return myHapiTransactionService.withSystemRequest().execute(myResourceModifiedDao::findAllOrderedByCreatedTime); + public Page findAllOrderedByCreatedTime(Pageable thePageable) { + return myHapiTransactionService.withSystemRequest().execute(() -> { + return myResourceModifiedDao.findAllOrderedByCreatedTime(thePageable); + }); } @Override diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/IPersistedResourceModifiedMessage.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/IPersistedResourceModifiedMessage.java index 292972d8cef..eed026d3fec 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/IPersistedResourceModifiedMessage.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/IPersistedResourceModifiedMessage.java @@ -19,9 +19,13 @@ */ package ca.uhn.fhir.jpa.model.entity; +import java.util.Date; + public interface IPersistedResourceModifiedMessage { IPersistedResourceModifiedMessagePK getPersistedResourceModifiedMessagePk(); String getResourceType(); + + Date getCreatedTime(); } diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/ResourceModifiedEntity.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/ResourceModifiedEntity.java index c6ca5dc45ec..84639f8254d 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/ResourceModifiedEntity.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/ResourceModifiedEntity.java @@ -76,6 +76,7 @@ public class ResourceModifiedEntity implements IPersistedResourceModifiedMessage return this; } + @Override public Date getCreatedTime() { return myCreatedTime; } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/async/AsyncResourceModifiedSubmitterSvc.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/async/AsyncResourceModifiedSubmitterSvc.java index a6228303bcd..f537a8f98cb 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/async/AsyncResourceModifiedSubmitterSvc.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/async/AsyncResourceModifiedSubmitterSvc.java @@ -23,10 +23,11 @@ package ca.uhn.fhir.jpa.subscription.async; import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage; import ca.uhn.fhir.subscription.api.IResourceModifiedConsumerWithRetries; import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.util.List; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; /** * The purpose of this service is to submit messages to the processing pipeline for which previous attempts at @@ -36,6 +37,8 @@ import java.util.List; public class AsyncResourceModifiedSubmitterSvc { private static final Logger ourLog = LoggerFactory.getLogger(AsyncResourceModifiedSubmitterSvc.class); + public static final int MAX_LIMIT = 1000; + private final IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; private final IResourceModifiedConsumerWithRetries myResourceModifiedConsumer; @@ -47,21 +50,32 @@ public class AsyncResourceModifiedSubmitterSvc { } public void runDeliveryPass() { + boolean hasMoreToFetch = false; + int limit = getLimit(); + do { + // we always take the 0th page, because we're deleting the elements as we process them + Page persistedResourceModifiedMsgsPage = myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime( + PageRequest.of(0, limit)); + ourLog.debug( + "Attempting to submit {} resources to consumer channel.", persistedResourceModifiedMsgsPage.getTotalElements()); - List allPersistedResourceModifiedMessages = - myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime(); - ourLog.debug( - "Attempting to submit {} resources to consumer channel.", allPersistedResourceModifiedMessages.size()); + hasMoreToFetch = persistedResourceModifiedMsgsPage.hasNext(); - for (IPersistedResourceModifiedMessage persistedResourceModifiedMessage : - allPersistedResourceModifiedMessages) { - - boolean wasProcessed = + for (IPersistedResourceModifiedMessage persistedResourceModifiedMessage : persistedResourceModifiedMsgsPage) { + boolean wasProcessed = myResourceModifiedConsumer.submitPersisedResourceModifiedMessage(persistedResourceModifiedMessage); - if (!wasProcessed) { - break; + if (!wasProcessed) { + // we're not fetching anymore no matter what + hasMoreToFetch = false; + break; + } } - } + } while (hasMoreToFetch); + } + + @VisibleForTesting + public static int getLimit() { + return MAX_LIMIT; } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/svc/ResourceModifiedSubmitterSvc.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/svc/ResourceModifiedSubmitterSvc.java index 2ed15411491..1c8d419502f 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/svc/ResourceModifiedSubmitterSvc.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/svc/ResourceModifiedSubmitterSvc.java @@ -143,6 +143,7 @@ public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer, boolean processed = true; ResourceModifiedMessage resourceModifiedMessage = null; + // TODO - batch these try { // delete the entry to lock the row to ensure unique processing boolean wasDeleted = deletePersistedResourceModifiedMessage( diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/async/AsyncSubscriptionMessageSubmissionIT.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/async/AsyncSubscriptionMessageSubmissionIT.java index 29e6a3ca26e..dd6d1fbc1cc 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/async/AsyncSubscriptionMessageSubmissionIT.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/async/AsyncSubscriptionMessageSubmissionIT.java @@ -1,11 +1,11 @@ package ca.uhn.fhir.jpa.subscription.async; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; -import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test; +import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao; import ca.uhn.fhir.jpa.model.config.SubscriptionSettings; -import ca.uhn.fhir.jpa.subscription.submit.interceptor.SynchronousSubscriptionMatcherInterceptor; +import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK; +import ca.uhn.fhir.jpa.model.entity.ResourceModifiedEntity; +import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; @@ -14,25 +14,39 @@ import ca.uhn.fhir.jpa.subscription.message.TestQueueConsumerHandler; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor; +import ca.uhn.fhir.jpa.subscription.submit.interceptor.SynchronousSubscriptionMatcherInterceptor; import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.fasterxml.jackson.core.JsonProcessingException; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.r4.model.Coding; import org.hl7.fhir.r4.model.Observation; +import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Subscription; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; +import org.springframework.data.domain.Pageable; import org.springframework.test.context.ContextConfiguration; +import java.util.List; + import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; @ContextConfiguration(classes = {AsyncSubscriptionMessageSubmissionIT.SpringConfig.class}) public class AsyncSubscriptionMessageSubmissionIT extends BaseSubscriptionsR4Test { + private static final Logger ourLog = LoggerFactory.getLogger(AsyncSubscriptionMessageSubmissionIT.class); @SpyBean IResourceModifiedConsumer myResourceModifiedConsumer; @@ -49,6 +63,9 @@ public class AsyncSubscriptionMessageSubmissionIT extends BaseSubscriptionsR4Tes StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber; private TestQueueConsumerHandler myQueueConsumerHandler; + @Autowired + private IResourceModifiedDao myResourceModifiedDao; + @AfterEach public void cleanupStoppableSubscriptionDeliveringRestHookSubscriber() { myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(null); @@ -73,6 +90,46 @@ public class AsyncSubscriptionMessageSubmissionIT extends BaseSubscriptionsR4Tes assertFalse(mySubscriptionMatcherInterceptor instanceof SynchronousSubscriptionMatcherInterceptor); } + @Test + public void runDeliveryPass_withManyResources_isBatchedAndKeepsResourceUsageDown() throws JsonProcessingException, InterruptedException { + // setup + myLogbackTestExtension.setUp(Level.DEBUG); + + String resourceType = "Patient"; + int factor = 5; + int numberOfResourcesToCreate = factor * AsyncResourceModifiedSubmitterSvc.MAX_LIMIT; + + ResourceModifiedEntity entity = new ResourceModifiedEntity(); + entity.setResourceType(resourceType); + PersistedResourceModifiedMessageEntityPK rpm = new PersistedResourceModifiedMessageEntityPK(); + rpm.setResourceVersion("1"); + entity.setResourceModifiedEntityPK(rpm); + + // we reuse the same exact msg content to avoid + // the slowdown of serializing it over and over + SystemRequestDetails details = new SystemRequestDetails(); + // create a large number of resources + for (int i = 0; i < numberOfResourcesToCreate; i++) { + Patient resource = new Patient(); + resource.setId(resourceType + "/" + (1 + i)); + myPatientDao.create(resource, details); + } + + assertEquals(numberOfResourcesToCreate, myResourceModifiedDao.count()); + + // test + myAsyncResourceModifiedSubmitterSvc.runDeliveryPass(); + + // verification + waitForQueueToDrain(); + assertCountOfResourcesNeedingSubmission(0); + + List events = myLogbackTestExtension.filterLoggingEventsWithPredicate(e -> { + return e.getLevel() == Level.DEBUG && e.getFormattedMessage().contains("Attempting to submit"); + }); + assertEquals(factor, events.size()); + } + @Test // the purpose of this test is to assert that a resource matching a given subscription is // delivered asynchronously to the subscription processing pipeline. @@ -107,7 +164,9 @@ public class AsyncSubscriptionMessageSubmissionIT extends BaseSubscriptionsR4Tes } private void assertCountOfResourcesNeedingSubmission(int theExpectedCount) { - assertThat(myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime()).hasSize(theExpectedCount); + assertThat(myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime( + Pageable.unpaged())) + .hasSize(theExpectedCount); } private Subscription createAndSubmitSubscriptionWithCriteria(String theCriteria) { diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/message/MessageSubscriptionR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/message/MessageSubscriptionR4Test.java index e1f586e62bf..f5434b738df 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/message/MessageSubscriptionR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/message/MessageSubscriptionR4Test.java @@ -1,9 +1,9 @@ package ca.uhn.fhir.jpa.subscription.message; -import static org.junit.jupiter.api.Assertions.assertEquals; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao; +import ca.uhn.fhir.jpa.model.config.SubscriptionSettings; import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage; import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK; import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK; @@ -13,7 +13,6 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; -import ca.uhn.fhir.jpa.model.config.SubscriptionSettings; import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber; import ca.uhn.fhir.rest.client.api.Header; import ca.uhn.fhir.rest.client.api.IGenericClient; @@ -37,9 +36,12 @@ import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.support.TransactionTemplate; +import java.util.Date; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -48,6 +50,7 @@ import static ca.uhn.fhir.jpa.model.util.JpaConstants.HEADER_META_SNAPSHOT_MODE; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -196,6 +199,7 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test { @Test public void testMethodFindAllOrdered_willReturnAllPersistedResourceModifiedMessagesOrderedByCreatedTime(){ + Date now = new Date(); mySubscriptionTestUtil.unregisterSubscriptionInterceptor(); // given @@ -209,11 +213,12 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test { IPersistedResourceModifiedMessage organizationPersistedMessage = myResourceModifiedMessagePersistenceSvc.persist(organizationResourceModifiedMessage); // when - List allPersisted = myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime(); + Page allPersisted = myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime( + Pageable.unpaged() + ); // then - assertOnPksAndOrder(allPersisted, List.of(patientPersistedMessage, organizationPersistedMessage)); - + assertOnPksAndOrder(allPersisted.stream().toList(), List.of(patientPersistedMessage, organizationPersistedMessage)); } @Test @@ -232,7 +237,8 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test { // then assertTrue(wasDeleted); - assertThat(myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime()).hasSize(0); + assertThat(myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime(Pageable.unpaged())) + .hasSize(0); } @Test diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/subscription/api/IResourceModifiedMessagePersistenceSvc.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/subscription/api/IResourceModifiedMessagePersistenceSvc.java index 125670413a5..b733970f90c 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/subscription/api/IResourceModifiedMessagePersistenceSvc.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/subscription/api/IResourceModifiedMessagePersistenceSvc.java @@ -23,8 +23,9 @@ package ca.uhn.fhir.subscription.api; import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage; import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; -import java.util.List; import java.util.Optional; /** @@ -38,10 +39,10 @@ public interface IResourceModifiedMessagePersistenceSvc { /** * Find all persistedResourceModifiedMessage sorted by ascending created dates (oldest to newest). - * + * @param thePageable Page request * @return A sorted list of persistedResourceModifiedMessage needing submission. */ - List findAllOrderedByCreatedTime(); + Page findAllOrderedByCreatedTime(Pageable thePageable); /** * Delete a persistedResourceModifiedMessage by its primary key.