Clean stale search results more aggressively. (#5396)

Use bulk DMA statements when cleaning the search cache.
The cleaner job now works as long as possible until a deadline based on the scheduling frequency.
This commit is contained in:
michaelabuckley 2023-10-26 16:37:03 -04:00 committed by GitHub
parent cf3cf2547d
commit 757ef72279
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 375 additions and 176 deletions

View File

@ -1,6 +1,6 @@
--- ---
type: perf type: perf
issue: 5387 issue: 5387
title: "Enable the cache when for some requests when a consent interceptor is active. title: "Enable the search cache for some requests even when a consent interceptor is active.
If no consent server uses canSeeResource (i.e. shouldProcessCanSeeResource() returns false); If no consent service uses canSeeResource (i.e. shouldProcessCanSeeResource() returns false);
or startOperation() returns AUTHORIZED; then cached results are safe." or startOperation() returns AUTHORIZED; then the search cache is enabled."

View File

@ -0,0 +1,5 @@
---
type: perf
issue: 5395
title: "The background activity that clears stale search results now has higher throughput.
Busy servers should no longer accumulate dead stale search results."

View File

@ -27,9 +27,10 @@ The ConsentInterceptor requires a user-supplied instance of the [IConsentService
## Performance and Privacy ## Performance and Privacy
The `canSeeResource()` operation requires inspecting every resource during a search and editing the results. Filtering search results in `canSeeResource()` requires inspecting every resource during a search and editing the results.
This is slower than the normal path, and will block the use of the search cache. This is slower than the normal path, and will prevent the reuse of the results from the search cache.
The `willSeeResource()` check is safe for cached searches, but removed resources may be 'visible' as holes in returned bundles. The `willSeeResource()` operation supports reusing cached search results, but removed resources may be 'visible' as holes in returned bundles.
If this information leak is acceptable, then the search cache can be enabled by blocking the use of `canSeeResource()` by returning `false` from `processCanSeeResource()`. Disabling `canSeeResource()` by returning `false` from `processCanSeeResource()` will enable the search cache.

View File

@ -20,8 +20,7 @@
package ca.uhn.fhir.jpa.dao.data; package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.entity.Search; import ca.uhn.fhir.jpa.entity.Search;
import org.springframework.data.domain.Pageable; import com.google.errorprone.annotations.CanIgnoreReturnValue;
import org.springframework.data.domain.Slice;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query; import org.springframework.data.jpa.repository.Query;
@ -30,6 +29,8 @@ import org.springframework.data.repository.query.Param;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
public interface ISearchDao extends JpaRepository<Search, Long>, IHapiFhirJpaRepository { public interface ISearchDao extends JpaRepository<Search, Long>, IHapiFhirJpaRepository {
@ -38,10 +39,12 @@ public interface ISearchDao extends JpaRepository<Search, Long>, IHapiFhirJpaRep
@Query( @Query(
"SELECT s.myId FROM Search s WHERE (s.myCreated < :cutoff) AND (s.myExpiryOrNull IS NULL OR s.myExpiryOrNull < :now) AND (s.myDeleted IS NULL OR s.myDeleted = FALSE)") "SELECT s.myId FROM Search s WHERE (s.myCreated < :cutoff) AND (s.myExpiryOrNull IS NULL OR s.myExpiryOrNull < :now) AND (s.myDeleted IS NULL OR s.myDeleted = FALSE)")
Slice<Long> findWhereCreatedBefore(@Param("cutoff") Date theCutoff, @Param("now") Date theNow, Pageable thePage); Stream<Long> findWhereCreatedBefore(@Param("cutoff") Date theCutoff, @Param("now") Date theNow);
@Query("SELECT s.myId FROM Search s WHERE s.myDeleted = TRUE") @Query("SELECT new ca.uhn.fhir.jpa.dao.data.SearchIdAndResultSize(" + "s.myId, "
Slice<Long> findDeleted(Pageable thePage); + "(select max(sr.myOrder) as maxOrder from SearchResult sr where sr.mySearchPid = s.myId)) "
+ "FROM Search s WHERE s.myDeleted = TRUE")
Stream<SearchIdAndResultSize> findDeleted();
@Query( @Query(
"SELECT s FROM Search s WHERE s.myResourceType = :type AND s.mySearchQueryStringHash = :hash AND (s.myCreated > :cutoff) AND s.myDeleted = FALSE AND s.myStatus <> 'FAILED'") "SELECT s FROM Search s WHERE s.myResourceType = :type AND s.mySearchQueryStringHash = :hash AND (s.myCreated > :cutoff) AND s.myDeleted = FALSE AND s.myStatus <> 'FAILED'")
@ -54,10 +57,15 @@ public interface ISearchDao extends JpaRepository<Search, Long>, IHapiFhirJpaRep
int countDeleted(); int countDeleted();
@Modifying @Modifying
@Query("UPDATE Search s SET s.myDeleted = :deleted WHERE s.myId = :pid") @Query("UPDATE Search s SET s.myDeleted = :deleted WHERE s.myId in (:pids)")
void updateDeleted(@Param("pid") Long thePid, @Param("deleted") boolean theDeleted); @CanIgnoreReturnValue
int updateDeleted(@Param("pids") Set<Long> thePid, @Param("deleted") boolean theDeleted);
@Modifying @Modifying
@Query("DELETE FROM Search s WHERE s.myId = :pid") @Query("DELETE FROM Search s WHERE s.myId = :pid")
void deleteByPid(@Param("pid") Long theId); void deleteByPid(@Param("pid") Long theId);
@Modifying
@Query("DELETE FROM Search s WHERE s.myId in (:pids)")
void deleteByPids(@Param("pids") Collection<Long> theSearchToDelete);
} }

View File

@ -20,14 +20,18 @@
package ca.uhn.fhir.jpa.dao.data; package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.entity.SearchInclude; import ca.uhn.fhir.jpa.entity.SearchInclude;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query; import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param; import org.springframework.data.repository.query.Param;
import java.util.Collection;
public interface ISearchIncludeDao extends JpaRepository<SearchInclude, Long>, IHapiFhirJpaRepository { public interface ISearchIncludeDao extends JpaRepository<SearchInclude, Long>, IHapiFhirJpaRepository {
@Modifying @Modifying
@Query(value = "DELETE FROM SearchInclude r WHERE r.mySearchPid = :search") @Query(value = "DELETE FROM SearchInclude r WHERE r.mySearchPid in (:search)")
void deleteForSearch(@Param("search") Long theSearchPid); @CanIgnoreReturnValue
int deleteForSearch(@Param("search") Collection<Long> theSearchPid);
} }

View File

@ -20,6 +20,7 @@
package ca.uhn.fhir.jpa.dao.data; package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.entity.SearchResult; import ca.uhn.fhir.jpa.entity.SearchResult;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice; import org.springframework.data.domain.Slice;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
@ -27,6 +28,7 @@ import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query; import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param; import org.springframework.data.repository.query.Param;
import java.util.Collection;
import java.util.List; import java.util.List;
public interface ISearchResultDao extends JpaRepository<SearchResult, Long>, IHapiFhirJpaRepository { public interface ISearchResultDao extends JpaRepository<SearchResult, Long>, IHapiFhirJpaRepository {
@ -37,12 +39,19 @@ public interface ISearchResultDao extends JpaRepository<SearchResult, Long>, IHa
@Query(value = "SELECT r.myResourcePid FROM SearchResult r WHERE r.mySearchPid = :search") @Query(value = "SELECT r.myResourcePid FROM SearchResult r WHERE r.mySearchPid = :search")
List<Long> findWithSearchPidOrderIndependent(@Param("search") Long theSearchPid); List<Long> findWithSearchPidOrderIndependent(@Param("search") Long theSearchPid);
@Query(value = "SELECT r.myId FROM SearchResult r WHERE r.mySearchPid = :search") @Modifying
Slice<Long> findForSearch(Pageable thePage, @Param("search") Long theSearchPid); @Query("DELETE FROM SearchResult s WHERE s.mySearchPid IN :searchIds")
@CanIgnoreReturnValue
int deleteBySearchIds(@Param("searchIds") Collection<Long> theSearchIds);
@Modifying @Modifying
@Query("DELETE FROM SearchResult s WHERE s.myId IN :ids") @Query(
void deleteByIds(@Param("ids") List<Long> theContent); "DELETE FROM SearchResult s WHERE s.mySearchPid = :searchId and s.myOrder >= :rangeStart and s.myOrder <= :rangeEnd")
@CanIgnoreReturnValue
int deleteBySearchIdInRange(
@Param("searchId") Long theSearchId,
@Param("rangeStart") int theRangeStart,
@Param("rangeEnd") int theRangeEnd);
@Query("SELECT count(r) FROM SearchResult r WHERE r.mySearchPid = :search") @Query("SELECT count(r) FROM SearchResult r WHERE r.mySearchPid = :search")
int countForSearch(@Param("search") Long theSearchPid); int countForSearch(@Param("search") Long theSearchPid);

View File

@ -0,0 +1,18 @@
package ca.uhn.fhir.jpa.dao.data;
import java.util.Objects;
/**
* Record for search result returning the PK of a Search, and the number of associated SearchResults
*/
public class SearchIdAndResultSize {
/** Search PK */
public final long searchId;
/** Number of SearchResults attached */
public final int size;
public SearchIdAndResultSize(long theSearchId, Integer theSize) {
searchId = theSearchId;
size = Objects.requireNonNullElse(theSize, 0);
}
}

View File

@ -37,21 +37,22 @@ public class SearchResult implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@Deprecated(since = "6.10", forRemoval = true) // migrating to composite PK on searchPid,Order
@GeneratedValue(strategy = GenerationType.AUTO, generator = "SEQ_SEARCH_RES") @GeneratedValue(strategy = GenerationType.AUTO, generator = "SEQ_SEARCH_RES")
@SequenceGenerator(name = "SEQ_SEARCH_RES", sequenceName = "SEQ_SEARCH_RES") @SequenceGenerator(name = "SEQ_SEARCH_RES", sequenceName = "SEQ_SEARCH_RES")
@Id @Id
@Column(name = "PID") @Column(name = "PID")
private Long myId; private Long myId;
@Column(name = "SEARCH_ORDER", nullable = false, insertable = true, updatable = false) @Column(name = "SEARCH_PID", insertable = true, updatable = false, nullable = false)
private Long mySearchPid;
@Column(name = "SEARCH_ORDER", insertable = true, updatable = false, nullable = false)
private int myOrder; private int myOrder;
@Column(name = "RESOURCE_PID", insertable = true, updatable = false, nullable = false) @Column(name = "RESOURCE_PID", insertable = true, updatable = false, nullable = false)
private Long myResourcePid; private Long myResourcePid;
@Column(name = "SEARCH_PID", insertable = true, updatable = false, nullable = false)
private Long mySearchPid;
/** /**
* Constructor * Constructor
*/ */

View File

@ -25,12 +25,16 @@ import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.search.cache.DatabaseSearchCacheSvcImpl;
import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import static ca.uhn.fhir.jpa.search.cache.DatabaseSearchCacheSvcImpl.SEARCH_CLEANUP_JOB_INTERVAL_MILLIS; import static ca.uhn.fhir.jpa.search.cache.DatabaseSearchCacheSvcImpl.SEARCH_CLEANUP_JOB_INTERVAL_MILLIS;
/** /**
@ -42,7 +46,6 @@ import static ca.uhn.fhir.jpa.search.cache.DatabaseSearchCacheSvcImpl.SEARCH_CLE
// in Smile. // in Smile.
// //
public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc, IHasScheduledJobs { public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc, IHasScheduledJobs {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(StaleSearchDeletingSvcImpl.class);
@Autowired @Autowired
private JpaStorageSettings myStorageSettings; private JpaStorageSettings myStorageSettings;
@ -53,7 +56,16 @@ public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc, IHas
@Override @Override
@Transactional(propagation = Propagation.NEVER) @Transactional(propagation = Propagation.NEVER)
public void pollForStaleSearchesAndDeleteThem() { public void pollForStaleSearchesAndDeleteThem() {
mySearchCacheSvc.pollForStaleSearchesAndDeleteThem(RequestPartitionId.allPartitions()); mySearchCacheSvc.pollForStaleSearchesAndDeleteThem(RequestPartitionId.allPartitions(), getDeadline());
}
/**
* Calculate a deadline to finish before the next scheduled run.
*/
protected Instant getDeadline() {
return Instant.ofEpochMilli(DatabaseSearchCacheSvcImpl.now())
// target a 90% duty-cycle to avoid confusing quartz
.plus((long) (SEARCH_CLEANUP_JOB_INTERVAL_MILLIS * 0.90), ChronoUnit.MILLIS);
} }
@Override @Override

View File

@ -25,29 +25,35 @@ import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.data.ISearchDao; import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao; import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
import ca.uhn.fhir.jpa.dao.data.ISearchResultDao; import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
import ca.uhn.fhir.jpa.dao.data.SearchIdAndResultSize;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService.IExecutionBuilder;
import ca.uhn.fhir.jpa.entity.Search; import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
import ca.uhn.fhir.system.HapiSystemProperties; import ca.uhn.fhir.system.HapiSystemProperties;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import org.hibernate.Session;
import org.hl7.fhir.dstu3.model.InstantType; import org.hl7.fhir.dstu3.model.InstantType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Slice;
import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.sql.Connection;
import java.time.Instant; import java.time.Instant;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.HashSet;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
public class DatabaseSearchCacheSvcImpl implements ISearchCacheSvc { public class DatabaseSearchCacheSvcImpl implements ISearchCacheSvc {
/* /*
@ -56,13 +62,12 @@ public class DatabaseSearchCacheSvcImpl implements ISearchCacheSvc {
* type query and this can fail if we have 1000s of params * type query and this can fail if we have 1000s of params
*/ */
public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT = 500; public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT = 500;
public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS = 20000; public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS = 50000;
public static final long SEARCH_CLEANUP_JOB_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE; public static final long SEARCH_CLEANUP_JOB_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE;
public static final int DEFAULT_MAX_DELETE_CANDIDATES_TO_FIND = 2000; public static final int DEFAULT_MAX_DELETE_CANDIDATES_TO_FIND = 2000;
private static final Logger ourLog = LoggerFactory.getLogger(DatabaseSearchCacheSvcImpl.class); private static final Logger ourLog = LoggerFactory.getLogger(DatabaseSearchCacheSvcImpl.class);
private static int ourMaximumResultsToDeleteInOneStatement = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT; private static int ourMaximumResultsToDeleteInOneStatement = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT;
private static int ourMaximumResultsToDeleteInOnePass = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS; private static int ourMaximumResultsToDeleteInOneCommit = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS;
private static int ourMaximumSearchesToCheckForDeletionCandidacy = DEFAULT_MAX_DELETE_CANDIDATES_TO_FIND;
private static Long ourNowForUnitTests; private static Long ourNowForUnitTests;
/* /*
* We give a bit of extra leeway just to avoid race conditions where a query result * We give a bit of extra leeway just to avoid race conditions where a query result
@ -74,6 +79,9 @@ public class DatabaseSearchCacheSvcImpl implements ISearchCacheSvc {
@Autowired @Autowired
private ISearchDao mySearchDao; private ISearchDao mySearchDao;
@Autowired
private EntityManager myEntityManager;
@Autowired @Autowired
private ISearchResultDao mySearchResultDao; private ISearchResultDao mySearchResultDao;
@ -169,14 +177,249 @@ public class DatabaseSearchCacheSvcImpl implements ISearchCacheSvc {
return Optional.empty(); return Optional.empty();
} }
/**
* A transient worker for a single pass through stale-search deletion.
*/
class DeleteRun {
final RequestPartitionId myRequestPartitionId;
final Instant myDeadline;
final Date myCutoffForDeletion;
final Set<Long> myUpdateDeletedFlagBatch = new HashSet<>();
final Set<Long> myDeleteSearchBatch = new HashSet<>();
/** the Search pids of the SearchResults we plan to delete in a chunk */
final Set<Long> myDeleteSearchResultsBatch = new HashSet<>();
/**
* Number of results we have queued up in mySearchPidsToDeleteResults to delete.
* We try to keep this to a reasonable size to avoid long transactions that may escalate to a table lock.
*/
private int myDeleteSearchResultsBatchCount = 0;
DeleteRun(Instant theDeadline, Date theCutoffForDeletion, RequestPartitionId theRequestPartitionId) {
myDeadline = theDeadline;
myCutoffForDeletion = theCutoffForDeletion;
myRequestPartitionId = theRequestPartitionId;
}
/**
* Mark all ids in the mySearchesToMarkForDeletion buffer as deleted, and clear the buffer.
*/
public void flushDeleteMarks() {
if (myUpdateDeletedFlagBatch.isEmpty()) {
return;
}
ourLog.debug("Marking {} searches as deleted", myUpdateDeletedFlagBatch.size());
mySearchDao.updateDeleted(myUpdateDeletedFlagBatch, true);
myUpdateDeletedFlagBatch.clear();
commitOpenChanges();
}
/**
* Dig into the guts of our Hibernate session, flush any changes in the session, and commit the underlying connection.
*/
private void commitOpenChanges() {
// flush to force Hibernate to actually get a connection from the pool
myEntityManager.flush();
// get our connection from the underlying Hibernate session, and commit
//noinspection resource
myEntityManager.unwrap(Session.class).doWork(Connection::commit);
}
void throwIfDeadlineExpired() {
boolean result = Instant.ofEpochMilli(now()).isAfter(myDeadline);
if (result) {
throw new DeadlineException(
Msg.code(2443) + "Deadline expired while cleaning Search cache - " + myDeadline);
}
}
private int deleteMarkedSearchesInBatches() {
AtomicInteger deletedCounter = new AtomicInteger(0);
try (final Stream<SearchIdAndResultSize> toDelete = mySearchDao.findDeleted()) {
assert toDelete != null;
toDelete.forEach(nextSearchToDelete -> {
throwIfDeadlineExpired();
deleteSearchAndResults(nextSearchToDelete.searchId, nextSearchToDelete.size);
deletedCounter.incrementAndGet();
});
}
// flush anything left in the buffers
flushSearchResultDeletes();
flushSearchAndIncludeDeletes();
int deletedCount = deletedCounter.get();
ourLog.info("Deleted {} expired searches", deletedCount);
return deletedCount;
}
/**
* Schedule theSearchPid for deletion assuming it has theNumberOfResults SearchResults attached.
*
* We accumulate a batch of search pids for deletion, and then do a bulk DML as we reach a threshold number
* of SearchResults.
*
* @param theSearchPid pk of the Search
* @param theNumberOfResults the number of SearchResults attached
*/
private void deleteSearchAndResults(long theSearchPid, int theNumberOfResults) {
ourLog.trace("Buffering deletion of search pid {} and {} results", theSearchPid, theNumberOfResults);
myDeleteSearchBatch.add(theSearchPid);
if (theNumberOfResults > ourMaximumResultsToDeleteInOneCommit) {
// don't buffer this one - do it inline
deleteSearchResultsByChunk(theSearchPid, theNumberOfResults);
return;
}
myDeleteSearchResultsBatch.add(theSearchPid);
myDeleteSearchResultsBatchCount += theNumberOfResults;
if (myDeleteSearchResultsBatchCount > ourMaximumResultsToDeleteInOneCommit) {
flushSearchResultDeletes();
}
if (myDeleteSearchBatch.size() > ourMaximumResultsToDeleteInOneStatement) {
// flush the results to make sure we don't have any references.
flushSearchResultDeletes();
flushSearchAndIncludeDeletes();
}
}
/**
* If this Search has more results than our max delete size,
* delete in by itself in range chunks.
* @param theSearchPid the target Search pid
* @param theNumberOfResults the number of search results present
*/
private void deleteSearchResultsByChunk(long theSearchPid, int theNumberOfResults) {
ourLog.debug(
"Search {} is large: has {} results. Deleting results in chunks.",
theSearchPid,
theNumberOfResults);
for (int rangeEnd = theNumberOfResults; rangeEnd >= 0; rangeEnd -= ourMaximumResultsToDeleteInOneCommit) {
int rangeStart = rangeEnd - ourMaximumResultsToDeleteInOneCommit;
ourLog.trace("Deleting results for search {}: {} - {}", theSearchPid, rangeStart, rangeEnd);
mySearchResultDao.deleteBySearchIdInRange(theSearchPid, rangeStart, rangeEnd);
commitOpenChanges();
}
}
private void flushSearchAndIncludeDeletes() {
if (myDeleteSearchBatch.isEmpty()) {
return;
}
ourLog.debug("Deleting {} Search records", myDeleteSearchBatch.size());
// referential integrity requires we delete includes before the search
mySearchIncludeDao.deleteForSearch(myDeleteSearchBatch);
mySearchDao.deleteByPids(myDeleteSearchBatch);
myDeleteSearchBatch.clear();
commitOpenChanges();
}
private void flushSearchResultDeletes() {
if (myDeleteSearchResultsBatch.isEmpty()) {
return;
}
ourLog.debug(
"Deleting {} Search Results from {} searches",
myDeleteSearchResultsBatchCount,
myDeleteSearchResultsBatch.size());
mySearchResultDao.deleteBySearchIds(myDeleteSearchResultsBatch);
myDeleteSearchResultsBatch.clear();
myDeleteSearchResultsBatchCount = 0;
commitOpenChanges();
}
IExecutionBuilder getTxBuilder() {
return myTransactionService.withSystemRequest().withRequestPartitionId(myRequestPartitionId);
}
private void run() {
ourLog.debug("Searching for searches which are before {}", myCutoffForDeletion);
// this tx builder is not really for tx management.
// Instead, it is used bind a Hibernate session + connection to this thread.
// We will run a streaming query to look for work, and then commit changes in batches during the loops.
getTxBuilder().execute(theStatus -> {
try {
markDeletedInBatches();
throwIfDeadlineExpired();
// Delete searches that are marked as deleted
int deletedCount = deleteMarkedSearchesInBatches();
throwIfDeadlineExpired();
if ((ourLog.isDebugEnabled() || HapiSystemProperties.isTestModeEnabled()) && (deletedCount > 0)) {
Long total = mySearchDao.count();
ourLog.debug("Deleted {} searches, {} remaining", deletedCount, total);
}
} catch (DeadlineException theTimeoutException) {
ourLog.warn(theTimeoutException.getMessage());
}
return null;
});
}
/**
* Stream through a list of pids before our cutoff, and set myDeleted=true in batches in a DML statement.
*/
private void markDeletedInBatches() {
try (Stream<Long> toMarkDeleted =
mySearchDao.findWhereCreatedBefore(myCutoffForDeletion, new Date(now()))) {
assert toMarkDeleted != null;
toMarkDeleted.forEach(nextSearchToDelete -> {
throwIfDeadlineExpired();
if (myUpdateDeletedFlagBatch.size() >= ourMaximumResultsToDeleteInOneStatement) {
flushDeleteMarks();
}
ourLog.trace("Marking search with PID {} as ready for deletion", nextSearchToDelete);
myUpdateDeletedFlagBatch.add(nextSearchToDelete);
});
flushDeleteMarks();
}
}
}
/**
* Marker to abandon our delete run when we are over time.
*/
private static class DeadlineException extends RuntimeException {
public DeadlineException(String message) {
super(message);
}
}
@Override @Override
public void pollForStaleSearchesAndDeleteThem(RequestPartitionId theRequestPartitionId) { public void pollForStaleSearchesAndDeleteThem(RequestPartitionId theRequestPartitionId, Instant theDeadline) {
HapiTransactionService.noTransactionAllowed(); HapiTransactionService.noTransactionAllowed();
if (!myStorageSettings.isExpireSearchResults()) { if (!myStorageSettings.isExpireSearchResults()) {
return; return;
} }
final Date cutoff = getCutoff();
final DeleteRun run = new DeleteRun(theDeadline, cutoff, theRequestPartitionId);
run.run();
}
@Nonnull
private Date getCutoff() {
long cutoffMillis = myStorageSettings.getExpireSearchResultsAfterMillis(); long cutoffMillis = myStorageSettings.getExpireSearchResultsAfterMillis();
if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) { if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) {
cutoffMillis = cutoffMillis + myStorageSettings.getReuseCachedSearchResultsForMillis(); cutoffMillis = cutoffMillis + myStorageSettings.getReuseCachedSearchResultsForMillis();
@ -189,108 +432,16 @@ public class DatabaseSearchCacheSvcImpl implements ISearchCacheSvc {
new InstantType(cutoff), new InstantType(cutoff),
new InstantType(new Date(now()))); new InstantType(new Date(now())));
} }
return cutoff;
ourLog.debug("Searching for searches which are before {}", cutoff);
// Mark searches as deleted if they should be
final Slice<Long> toMarkDeleted = myTransactionService
.withSystemRequestOnPartition(theRequestPartitionId)
.execute(theStatus -> mySearchDao.findWhereCreatedBefore(
cutoff, new Date(), PageRequest.of(0, ourMaximumSearchesToCheckForDeletionCandidacy)));
assert toMarkDeleted != null;
for (final Long nextSearchToDelete : toMarkDeleted) {
ourLog.debug("Deleting search with PID {}", nextSearchToDelete);
myTransactionService
.withSystemRequest()
.withRequestPartitionId(theRequestPartitionId)
.execute(t -> {
mySearchDao.updateDeleted(nextSearchToDelete, true);
return null;
});
}
// Delete searches that are marked as deleted
final Slice<Long> toDelete = myTransactionService
.withSystemRequestOnPartition(theRequestPartitionId)
.execute(theStatus ->
mySearchDao.findDeleted(PageRequest.of(0, ourMaximumSearchesToCheckForDeletionCandidacy)));
assert toDelete != null;
for (final Long nextSearchToDelete : toDelete) {
ourLog.debug("Deleting search with PID {}", nextSearchToDelete);
myTransactionService
.withSystemRequest()
.withRequestPartitionId(theRequestPartitionId)
.execute(t -> {
deleteSearch(nextSearchToDelete);
return null;
});
}
int count = toDelete.getContent().size();
if (count > 0) {
if (ourLog.isDebugEnabled() || HapiSystemProperties.isTestModeEnabled()) {
Long total = myTransactionService
.withSystemRequest()
.withRequestPartitionId(theRequestPartitionId)
.execute(t -> mySearchDao.count());
ourLog.debug("Deleted {} searches, {} remaining", count, total);
}
}
}
private void deleteSearch(final Long theSearchPid) {
mySearchDao.findById(theSearchPid).ifPresent(searchToDelete -> {
mySearchIncludeDao.deleteForSearch(searchToDelete.getId());
/*
* Note, we're only deleting up to 500 results in an individual search here. This
* is to prevent really long running transactions in cases where there are
* huge searches with tons of results in them. By the time we've gotten here
* we have marked the parent Search entity as deleted, so it's not such a
* huge deal to be only partially deleting search results. They'll get deleted
* eventually
*/
int max = ourMaximumResultsToDeleteInOnePass;
Slice<Long> resultPids = mySearchResultDao.findForSearch(PageRequest.of(0, max), searchToDelete.getId());
if (resultPids.hasContent()) {
List<List<Long>> partitions =
Lists.partition(resultPids.getContent(), ourMaximumResultsToDeleteInOneStatement);
for (List<Long> nextPartition : partitions) {
mySearchResultDao.deleteByIds(nextPartition);
}
}
// Only delete if we don't have results left in this search
if (resultPids.getNumberOfElements() < max) {
ourLog.debug(
"Deleting search {}/{} - Created[{}]",
searchToDelete.getId(),
searchToDelete.getUuid(),
new InstantType(searchToDelete.getCreated()));
mySearchDao.deleteByPid(searchToDelete.getId());
} else {
ourLog.debug(
"Purged {} search results for deleted search {}/{}",
resultPids.getSize(),
searchToDelete.getId(),
searchToDelete.getUuid());
}
});
}
@VisibleForTesting
public static void setMaximumSearchesToCheckForDeletionCandidacyForUnitTest(
int theMaximumSearchesToCheckForDeletionCandidacy) {
ourMaximumSearchesToCheckForDeletionCandidacy = theMaximumSearchesToCheckForDeletionCandidacy;
} }
@VisibleForTesting @VisibleForTesting
public static void setMaximumResultsToDeleteInOnePassForUnitTest(int theMaximumResultsToDeleteInOnePass) { public static void setMaximumResultsToDeleteInOnePassForUnitTest(int theMaximumResultsToDeleteInOnePass) {
ourMaximumResultsToDeleteInOnePass = theMaximumResultsToDeleteInOnePass; ourMaximumResultsToDeleteInOneCommit = theMaximumResultsToDeleteInOnePass;
} }
@VisibleForTesting @VisibleForTesting
public static void setMaximumResultsToDeleteForUnitTest(int theMaximumResultsToDelete) { public static void setMaximumResultsToDeleteInOneStatement(int theMaximumResultsToDelete) {
ourMaximumResultsToDeleteInOneStatement = theMaximumResultsToDelete; ourMaximumResultsToDeleteInOneStatement = theMaximumResultsToDelete;
} }
@ -302,7 +453,7 @@ public class DatabaseSearchCacheSvcImpl implements ISearchCacheSvc {
ourNowForUnitTests = theNowForUnitTests; ourNowForUnitTests = theNowForUnitTests;
} }
private static long now() { public static long now() {
if (ourNowForUnitTests != null) { if (ourNowForUnitTests != null) {
return ourNowForUnitTests; return ourNowForUnitTests;
} }

View File

@ -23,6 +23,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.Search; import ca.uhn.fhir.jpa.entity.Search;
import java.time.Instant; import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Optional; import java.util.Optional;
public interface ISearchCacheSvc { public interface ISearchCacheSvc {
@ -86,5 +87,10 @@ public interface ISearchCacheSvc {
* if they have some other mechanism for expiring stale results other than manually looking for them * if they have some other mechanism for expiring stale results other than manually looking for them
* and deleting them. * and deleting them.
*/ */
void pollForStaleSearchesAndDeleteThem(RequestPartitionId theRequestPartitionId); void pollForStaleSearchesAndDeleteThem(RequestPartitionId theRequestPartitionId, Instant theDeadline);
@Deprecated(since = "6.10", forRemoval = true) // wipmb delete once cdr merges
default void pollForStaleSearchesAndDeleteThem(RequestPartitionId theRequestPartitionId) {
pollForStaleSearchesAndDeleteThem(theRequestPartitionId, Instant.now().plus(1, ChronoUnit.MINUTES));
}
} }

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -72,9 +73,9 @@ public class FhirResourceDaoR4SearchLastNAsyncIT extends BaseR4SearchLastN {
public void testLastNChunking() { public void testLastNChunking() {
runInTransaction(() -> { runInTransaction(() -> {
for (Search search : mySearchDao.findAll()) { Set<Long> all = mySearchDao.findAll().stream().map(Search::getId).collect(Collectors.toSet());
mySearchDao.updateDeleted(search.getId(), true);
} mySearchDao.updateDeleted(all, true);
}); });
// Set up search parameters that will return 75 Observations. // Set up search parameters that will return 75 Observations.

View File

@ -1,7 +1,6 @@
package ca.uhn.fhir.jpa.dao.r4; package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.dao.data.ISearchDao; import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.data.ISearchResultDao; import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
import ca.uhn.fhir.jpa.entity.Search; import ca.uhn.fhir.jpa.entity.Search;
@ -16,6 +15,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date; import java.util.Date;
import java.util.UUID; import java.util.UUID;
@ -31,22 +32,20 @@ public class SearchCoordinatorSvcImplTest extends BaseJpaR4Test {
@Autowired @Autowired
private ISearchResultDao mySearchResultDao; private ISearchResultDao mySearchResultDao;
@Autowired
private ISearchCoordinatorSvc mySearchCoordinator;
@Autowired @Autowired
private ISearchCacheSvc myDatabaseCacheSvc; private ISearchCacheSvc myDatabaseCacheSvc;
@AfterEach @AfterEach
public void after() { public void after() {
DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(DatabaseSearchCacheSvcImpl.DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS); DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(DatabaseSearchCacheSvcImpl.DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS);
DatabaseSearchCacheSvcImpl.setMaximumSearchesToCheckForDeletionCandidacyForUnitTest(DEFAULT_MAX_DELETE_CANDIDATES_TO_FIND);
} }
/**
* Semi-obsolete test. This used to test incremental deletion, but we now work until done or a timeout.
*/
@Test @Test
public void testDeleteDontMarkPreviouslyMarkedSearchesAsDeleted() { public void testDeleteDontMarkPreviouslyMarkedSearchesAsDeleted() {
DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(5); DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(5);
DatabaseSearchCacheSvcImpl.setMaximumSearchesToCheckForDeletionCandidacyForUnitTest(10);
runInTransaction(()->{ runInTransaction(()->{
mySearchResultDao.deleteAll(); mySearchResultDao.deleteAll();
@ -86,28 +85,12 @@ public class SearchCoordinatorSvcImplTest extends BaseJpaR4Test {
assertEquals(30, mySearchResultDao.count()); assertEquals(30, mySearchResultDao.count());
}); });
myDatabaseCacheSvc.pollForStaleSearchesAndDeleteThem(RequestPartitionId.allPartitions()); myDatabaseCacheSvc.pollForStaleSearchesAndDeleteThem(RequestPartitionId.allPartitions(), Instant.now().plus(10, ChronoUnit.SECONDS));
runInTransaction(()->{ runInTransaction(()->{
// We should delete up to 10, but 3 don't get deleted since they have too many results to delete in one pass // We should delete up to 10, but 3 don't get deleted since they have too many results to delete in one pass
assertEquals(13, mySearchDao.count());
assertEquals(3, mySearchDao.countDeleted());
// We delete a max of 5 results per search, so half are gone
assertEquals(15, mySearchResultDao.count());
});
myDatabaseCacheSvc.pollForStaleSearchesAndDeleteThem(RequestPartitionId.allPartitions());
runInTransaction(()->{
// Once again we attempt to delete 10, but the first 3 don't get deleted and still remain
// (total is 6 because 3 weren't deleted, and they blocked another 3 that might have been)
assertEquals(6, mySearchDao.count());
assertEquals(6, mySearchDao.countDeleted());
assertEquals(0, mySearchResultDao.count());
});
myDatabaseCacheSvc.pollForStaleSearchesAndDeleteThem(RequestPartitionId.allPartitions());
runInTransaction(()->{
assertEquals(0, mySearchDao.count()); assertEquals(0, mySearchDao.count());
assertEquals(0, mySearchDao.countDeleted()); assertEquals(0, mySearchDao.countDeleted());
// We delete a max of 5 results per search, so half are gone
assertEquals(0, mySearchResultDao.count()); assertEquals(0, mySearchResultDao.count());
}); });
} }

View File

@ -48,7 +48,7 @@ public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
super.after(); super.after();
DatabaseSearchCacheSvcImpl staleSearchDeletingSvc = AopTestUtils.getTargetObject(mySearchCacheSvc); DatabaseSearchCacheSvcImpl staleSearchDeletingSvc = AopTestUtils.getTargetObject(mySearchCacheSvc);
staleSearchDeletingSvc.setCutoffSlackForUnitTest(DatabaseSearchCacheSvcImpl.SEARCH_CLEANUP_JOB_INTERVAL_MILLIS); staleSearchDeletingSvc.setCutoffSlackForUnitTest(DatabaseSearchCacheSvcImpl.SEARCH_CLEANUP_JOB_INTERVAL_MILLIS);
DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteForUnitTest(DatabaseSearchCacheSvcImpl.DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT); DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOneStatement(DatabaseSearchCacheSvcImpl.DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT);
DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(DatabaseSearchCacheSvcImpl.DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS); DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(DatabaseSearchCacheSvcImpl.DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS);
} }
@ -108,7 +108,7 @@ public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
@Test @Test
public void testDeleteVeryLargeSearch() { public void testDeleteVeryLargeSearch() {
DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteForUnitTest(10); DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOneStatement(10);
DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(10); DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOnePassForUnitTest(10);
runInTransaction(() -> { runInTransaction(() -> {
@ -120,24 +120,21 @@ public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
search.setResourceType("Patient"); search.setResourceType("Patient");
search = mySearchEntityDao.save(search); search = mySearchEntityDao.save(search);
for (int i = 0; i < 15; i++) {
ResourceTable resource = new ResourceTable(); ResourceTable resource = new ResourceTable();
resource.setPublished(new Date()); resource.setPublished(new Date());
resource.setUpdated(new Date()); resource.setUpdated(new Date());
resource.setResourceType("Patient"); resource.setResourceType("Patient");
resource = myResourceTableDao.saveAndFlush(resource); resource = myResourceTableDao.saveAndFlush(resource);
for (int i = 0; i < 50; i++) {
SearchResult sr = new SearchResult(search); SearchResult sr = new SearchResult(search);
sr.setOrder(i); sr.setOrder(i);
sr.setResourcePid(resource.getId()); sr.setResourcePid(resource.getId());
mySearchResultDao.save(sr); mySearchResultDao.save(sr);
} }
}); });
// It should take two passes to delete the search fully // we are able to delete this in one pass.
runInTransaction(() -> assertEquals(1, mySearchEntityDao.count()));
myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem();
runInTransaction(() -> assertEquals(1, mySearchEntityDao.count())); runInTransaction(() -> assertEquals(1, mySearchEntityDao.count()));
myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem(); myStaleSearchDeletingSvc.pollForStaleSearchesAndDeleteThem();
runInTransaction(() -> assertEquals(0, mySearchEntityDao.count())); runInTransaction(() -> assertEquals(0, mySearchEntityDao.count()));
@ -146,7 +143,7 @@ public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
@Test @Test
public void testDeleteVerySmallSearch() { public void testDeleteVerySmallSearch() {
DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteForUnitTest(10); DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOneStatement(10);
runInTransaction(() -> { runInTransaction(() -> {
Search search = new Search(); Search search = new Search();
@ -172,7 +169,7 @@ public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
@Test @Test
public void testDontDeleteSearchBeforeExpiry() { public void testDontDeleteSearchBeforeExpiry() {
DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteForUnitTest(10); DatabaseSearchCacheSvcImpl.setMaximumResultsToDeleteInOneStatement(10);
runInTransaction(() -> { runInTransaction(() -> {
Search search = new Search(); Search search = new Search();
@ -186,7 +183,7 @@ public class StaleSearchDeletingSvcR4Test extends BaseResourceProviderR4Test {
search.setCreated(DateUtils.addDays(new Date(), -10000)); search.setCreated(DateUtils.addDays(new Date(), -10000));
search.setSearchType(SearchTypeEnum.SEARCH); search.setSearchType(SearchTypeEnum.SEARCH);
search.setResourceType("Patient"); search.setResourceType("Patient");
search = mySearchEntityDao.save(search); mySearchEntityDao.save(search);
}); });

View File

@ -65,6 +65,7 @@ public class ConnectionWrapper implements Connection {
@Override @Override
public void commit() throws SQLException { public void commit() throws SQLException {
if (ourLog.isTraceEnabled()) { ourLog.trace("commit: {}", myWrap.hashCode()); }
myWrap.commit(); myWrap.commit();
} }

View File

@ -46,6 +46,7 @@ public class ConnectionWrapper implements Connection {
@Override @Override
public void commit() throws SQLException { public void commit() throws SQLException {
if (ourLog.isTraceEnabled()) { ourLog.trace("Commit: {}", myWrap.hashCode()); }
myWrap.commit(); myWrap.commit();
} }

View File

@ -514,7 +514,8 @@ public class HapiTransactionService implements IHapiTransactionService {
} }
@Nullable @Nullable
private static <T> T executeInExistingTransaction(TransactionCallback<T> theCallback) { private static <T> T executeInExistingTransaction(@Nonnull TransactionCallback<T> theCallback) {
// TODO we could probably track the TransactionStatus we need as a thread local like we do our partition id.
return theCallback.doInTransaction(null); return theCallback.doInTransaction(null);
} }