Use streams to avoid multiple queries during batch job id chunking. (#5444)

Use stream for chunking instead of repeated sorted query pages.
This commit is contained in:
Michael Buckley 2023-11-20 17:39:36 -05:00 committed by GitHub
parent 6f84d17b13
commit 6abfed603e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 851 additions and 328 deletions

View File

@ -0,0 +1,57 @@
/*-
* #%L
* HAPI FHIR - Core Library
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.util;
import com.google.common.collect.Iterators;
import com.google.common.collect.UnmodifiableIterator;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class StreamUtil {
/** Static util class */
private StreamUtil() {}
/**
* Chunk the stream into Lists of size theChunkSize.
* The last chunk will be smaller unless the stream size is evenly divisible.
* Closes the underlying stream when done.
*
* @param theStream the input stream
* @param theChunkSize the chunk size.
* @return a stream of chunks
*/
public static <T> Stream<List<T>> partition(Stream<T> theStream, int theChunkSize) {
Spliterator<T> spliterator = theStream.spliterator();
Iterator<T> iterator = Spliterators.iterator(spliterator);
UnmodifiableIterator<List<T>> partition = Iterators.partition(iterator, theChunkSize);
// we could be fancier here and support parallel, and sizes; but serial-only is fine for now.
Spliterator<List<T>> partitionedSpliterator = Spliterators.spliteratorUnknownSize(partition, 0);
Stream<List<T>> result = StreamSupport.stream(partitionedSpliterator, false);
// we lose close() via the Iterator. Add it back.
return result.onClose(theStream::close);
}
}

View File

@ -0,0 +1,65 @@
package ca.uhn.fhir.util;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
class StreamUtilTest {
@ParameterizedTest
@MethodSource("streamPartitionTestCases")
void testStreamPartitionBy4(String theCase, List<Integer> theInput, List<List<Integer>> theOutput) {
List<List<Integer>> result = StreamUtil.partition(theInput.stream(), 4).toList();
assertEquals(theOutput, result, theCase);
}
static Object[][] streamPartitionTestCases() {
return new Object[][]{
{
"empty list produces empty stream",
List.of(),
List.of()
},
{
"short list produces single chunk",
List.of(1, 2, 3),
List.of(List.of(1, 2, 3))
},
{
"longer list produces several chunks",
List.of(1, 2, 3, 1, 2, 3, 1, 2, 3),
List.of(List.of(1, 2, 3, 1), List.of(2, 3, 1, 2), List.of(3))
},
{
"even size produces even chunks",
List.of(1, 2, 3,4,5,6,7,8),
List.of(List.of(1, 2, 3,4), List.of(5,6,7,8))
},
};
}
@Test
void testStreamPartitionClosesOriginalStream() {
// given
AtomicBoolean closed = new AtomicBoolean(false);
Stream<Integer> baseStream = Stream.of(1, 2, 3).onClose(()->closed.set(true));
// when
StreamUtil.partition(baseStream, 2).close();
// then
assertThat("partition closed underlying stream", closed.get());
}
}

View File

@ -0,0 +1,4 @@
---
type: change
issue: 5444
title: "The reindexing and mdm-clear batch jobs now stream results internally for more reliable operation."

View File

@ -47,15 +47,9 @@ public class Batch2SupportConfig {
MatchUrlService theMatchUrlService,
DaoRegistry theDaoRegistry,
FhirContext theFhirContext,
IHapiTransactionService theTransactionService,
JpaStorageSettings theJpaStorageSettings) {
IHapiTransactionService theTransactionService) {
return new Batch2DaoSvcImpl(
theResourceTableDao,
theMatchUrlService,
theDaoRegistry,
theFhirContext,
theTransactionService,
theJpaStorageSettings);
theResourceTableDao, theMatchUrlService, theDaoRegistry, theFhirContext, theTransactionService);
}
@Bean

View File

@ -117,6 +117,8 @@ import ca.uhn.fhir.validation.IValidatorModule;
import ca.uhn.fhir.validation.ValidationOptions;
import ca.uhn.fhir.validation.ValidationResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Streams;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseCoding;
import org.hl7.fhir.instance.model.api.IBaseMetaType;
@ -150,6 +152,7 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
@ -2067,6 +2070,28 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
});
}
public <PID extends IResourcePersistentId<?>> Stream<PID> searchForIdStream(
SearchParameterMap theParams,
RequestDetails theRequest,
@Nullable IBaseResource theConditionalOperationTargetOrNull) {
// the Stream is useless outside the bound connection time, so require our caller to have a session.
HapiTransactionService.requireTransaction();
RequestPartitionId requestPartitionId =
myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType(
theRequest, myResourceName, theParams, theConditionalOperationTargetOrNull);
ISearchBuilder<?> builder = mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType());
String uuid = UUID.randomUUID().toString();
SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequest, uuid);
IResultIterator<PID> iter =
builder.createQuery(theParams, searchRuntimeDetails, theRequest, requestPartitionId);
// Adapt IResultIterator to stream, and connect the close handler.
return Streams.stream(iter).onClose(() -> IOUtils.closeQuietly(iter));
}
protected <MT extends IBaseMetaType> MT toMetaDt(Class<MT> theType, Collection<TagDefinition> tagDefinitions) {
MT retVal = ReflectionUtil.newInstance(theType);
for (TagDefinition next : tagDefinitions) {

View File

@ -35,6 +35,7 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
@Transactional(propagation = Propagation.MANDATORY)
public interface IResourceTableDao
@ -65,13 +66,10 @@ public interface IResourceTableDao
Slice<Long> findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(
Pageable thePage, @Param("low") Date theLow, @Param("high") Date theHigh);
/**
* @return List of arrays containing [PID, resourceType, lastUpdated]
*/
@Query(
"SELECT t.myId, t.myResourceType, t.myUpdated FROM ResourceTable t WHERE t.myUpdated >= :low AND t.myUpdated <= :high ORDER BY t.myUpdated ASC")
Slice<Object[]> findIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldest(
Pageable thePage, @Param("low") Date theLow, @Param("high") Date theHigh);
Stream<Object[]> streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldest(
@Param("low") Date theLow, @Param("high") Date theHigh);
/**
* @return List of arrays containing [PID, resourceType, lastUpdated]
@ -84,6 +82,13 @@ public interface IResourceTableDao
@Param("high") Date theHigh,
@Param("partition_ids") List<Integer> theRequestPartitionIds);
@Query(
"SELECT t.myId, t.myResourceType, t.myUpdated FROM ResourceTable t WHERE t.myUpdated >= :low AND t.myUpdated <= :high AND t.myPartitionIdValue IN (:partition_ids) ORDER BY t.myUpdated ASC")
Stream<Object[]> streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForPartitionIds(
@Param("low") Date theLow,
@Param("high") Date theHigh,
@Param("partition_ids") List<Integer> theRequestPartitionIds);
/**
* @return List of arrays containing [PID, resourceType, lastUpdated]
*/
@ -92,6 +97,11 @@ public interface IResourceTableDao
Slice<Object[]> findIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForDefaultPartition(
Pageable thePage, @Param("low") Date theLow, @Param("high") Date theHigh);
@Query(
"SELECT t.myId, t.myResourceType, t.myUpdated FROM ResourceTable t WHERE t.myUpdated >= :low AND t.myUpdated <= :high ORDER BY t.myUpdated ASC")
Stream<Object[]> streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForDefaultPartition(
@Param("low") Date theLow, @Param("high") Date theHigh);
// TODO in the future, consider sorting by pid as well so batch jobs process in the same order across restarts
@Query(
"SELECT t.myId FROM ResourceTable t WHERE t.myUpdated >= :low AND t.myUpdated <= :high AND t.myPartitionIdValue = :partition_id ORDER BY t.myUpdated ASC")

View File

@ -23,13 +23,13 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.pid.EmptyResourcePidList;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.MixedResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.pid.StreamTemplate;
import ca.uhn.fhir.jpa.api.pid.TypedResourcePid;
import ca.uhn.fhir.jpa.api.pid.TypedResourceStream;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
@ -37,18 +37,15 @@ import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.SortOrderEnum;
import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import ca.uhn.fhir.util.DateRangeUtil;
import org.apache.commons.lang3.Validate;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -65,8 +62,6 @@ public class Batch2DaoSvcImpl implements IBatch2DaoSvc {
private final IHapiTransactionService myTransactionService;
private final JpaStorageSettings myJpaStorageSettings;
@Override
public boolean isAllResourceTypeSupported() {
return true;
@ -77,113 +72,110 @@ public class Batch2DaoSvcImpl implements IBatch2DaoSvc {
MatchUrlService theMatchUrlService,
DaoRegistry theDaoRegistry,
FhirContext theFhirContext,
IHapiTransactionService theTransactionService,
JpaStorageSettings theJpaStorageSettings) {
IHapiTransactionService theTransactionService) {
myResourceTableDao = theResourceTableDao;
myMatchUrlService = theMatchUrlService;
myDaoRegistry = theDaoRegistry;
myFhirContext = theFhirContext;
myTransactionService = theTransactionService;
myJpaStorageSettings = theJpaStorageSettings;
}
@Override
public IResourcePidList fetchResourceIdsPage(
Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theUrl) {
return myTransactionService
.withSystemRequest()
.withRequestPartitionId(theRequestPartitionId)
.execute(() -> {
public IResourcePidStream fetchResourceIdStream(
Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId, String theUrl) {
if (theUrl == null) {
return fetchResourceIdsPageNoUrl(theStart, theEnd, theRequestPartitionId);
return makeStreamResult(
theRequestPartitionId, () -> streamResourceIdsNoUrl(theStart, theEnd, theRequestPartitionId));
} else {
return fetchResourceIdsPageWithUrl(theEnd, theUrl, theRequestPartitionId);
return makeStreamResult(
theRequestPartitionId,
() -> streamResourceIdsWithUrl(theStart, theEnd, theUrl, theRequestPartitionId));
}
});
}
private Stream<TypedResourcePid> streamResourceIdsWithUrl(
Date theStart, Date theEnd, String theUrl, RequestPartitionId theRequestPartitionId) {
validateUrl(theUrl);
SearchParameterMap searchParamMap = parseQuery(theUrl);
searchParamMap.setLastUpdated(DateRangeUtil.narrowDateRange(searchParamMap.getLastUpdated(), theStart, theEnd));
String resourceType = theUrl.substring(0, theUrl.indexOf('?'));
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType);
SystemRequestDetails request = new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId);
return dao.searchForIdStream(searchParamMap, request, null).map(pid -> new TypedResourcePid(resourceType, pid));
}
private static TypedResourcePid typedPidFromQueryArray(Object[] thePidTypeDateArray) {
String resourceType = (String) thePidTypeDateArray[1];
Long pid = (Long) thePidTypeDateArray[0];
return new TypedResourcePid(resourceType, JpaPid.fromId(pid));
}
@Nonnull
private HomogeneousResourcePidList fetchResourceIdsPageWithUrl(
Date theEnd, @Nonnull String theUrl, @Nullable RequestPartitionId theRequestPartitionId) {
private TypedResourceStream makeStreamResult(
RequestPartitionId theRequestPartitionId, Supplier<Stream<TypedResourcePid>> streamSupplier) {
IHapiTransactionService.IExecutionBuilder txSettings =
myTransactionService.withSystemRequest().withRequestPartitionId(theRequestPartitionId);
StreamTemplate<TypedResourcePid> streamTemplate =
StreamTemplate.fromSupplier(streamSupplier).withTransactionAdvice(txSettings);
return new TypedResourceStream(theRequestPartitionId, streamTemplate);
}
@Nonnull
private Stream<TypedResourcePid> streamResourceIdsNoUrl(
Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId) {
Stream<Object[]> rowStream;
if (theRequestPartitionId == null || theRequestPartitionId.isAllPartitions()) {
ourLog.debug("Search for resources - all partitions");
rowStream = myResourceTableDao.streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldest(
theStart, theEnd);
} else if (theRequestPartitionId.isDefaultPartition()) {
ourLog.debug("Search for resources - default partition");
rowStream =
myResourceTableDao
.streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForDefaultPartition(
theStart, theEnd);
} else {
ourLog.debug("Search for resources - partition {}", theRequestPartitionId);
rowStream =
myResourceTableDao
.streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForPartitionIds(
theStart, theEnd, theRequestPartitionId.getPartitionIds());
}
return rowStream.map(Batch2DaoSvcImpl::typedPidFromQueryArray);
}
@Deprecated(since = "6.11", forRemoval = true) // delete once the default method in the interface is gone.
@Override
public IResourcePidList fetchResourceIdsPage(
Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theUrl) {
Validate.isTrue(false, "Unimplemented");
return null;
}
private static void validateUrl(@Nonnull String theUrl) {
if (!theUrl.contains("?")) {
throw new InternalErrorException(Msg.code(2422) + "this should never happen: URL is missing a '?'");
}
final Integer internalSynchronousSearchSize = myJpaStorageSettings.getInternalSynchronousSearchSize();
if (internalSynchronousSearchSize == null || internalSynchronousSearchSize <= 0) {
throw new InternalErrorException(Msg.code(2423)
+ "this should never happen: internalSynchronousSearchSize is null or less than or equal to 0");
}
List<IResourcePersistentId> currentIds = fetchResourceIdsPageWithUrl(0, theUrl, theRequestPartitionId);
ourLog.debug("FIRST currentIds: {}", currentIds.size());
final List<IResourcePersistentId> allIds = new ArrayList<>(currentIds);
while (internalSynchronousSearchSize < currentIds.size()) {
// Ensure the offset is set to the last ID in the cumulative List, otherwise, we'll be stuck in an infinite
// loop here:
currentIds = fetchResourceIdsPageWithUrl(allIds.size(), theUrl, theRequestPartitionId);
ourLog.debug("NEXT currentIds: {}", currentIds.size());
allIds.addAll(currentIds);
}
final String resourceType = theUrl.substring(0, theUrl.indexOf('?'));
return new HomogeneousResourcePidList(resourceType, allIds, theEnd, theRequestPartitionId);
}
private List<IResourcePersistentId> fetchResourceIdsPageWithUrl(
int theOffset, String theUrl, RequestPartitionId theRequestPartitionId) {
@Nonnull
private SearchParameterMap parseQuery(String theUrl) {
String resourceType = theUrl.substring(0, theUrl.indexOf('?'));
RuntimeResourceDefinition def = myFhirContext.getResourceDefinition(resourceType);
SearchParameterMap searchParamMap = myMatchUrlService.translateMatchUrl(theUrl, def);
searchParamMap.setSort(new SortSpec(Constants.PARAM_ID, SortOrderEnum.ASC));
searchParamMap.setOffset(theOffset);
searchParamMap.setLoadSynchronousUpTo(myJpaStorageSettings.getInternalSynchronousSearchSize() + 1);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType);
SystemRequestDetails request = new SystemRequestDetails();
request.setRequestPartitionId(theRequestPartitionId);
return dao.searchForIds(searchParamMap, request);
}
@Nonnull
private IResourcePidList fetchResourceIdsPageNoUrl(
Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId) {
final Pageable page = Pageable.unpaged();
Slice<Object[]> slice;
if (theRequestPartitionId == null || theRequestPartitionId.isAllPartitions()) {
slice = myResourceTableDao.findIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldest(
page, theStart, theEnd);
} else if (theRequestPartitionId.isDefaultPartition()) {
slice =
myResourceTableDao
.findIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForDefaultPartition(
page, theStart, theEnd);
} else {
slice =
myResourceTableDao
.findIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForPartitionIds(
page, theStart, theEnd, theRequestPartitionId.getPartitionIds());
}
List<Object[]> content = slice.getContent();
if (content.isEmpty()) {
return new EmptyResourcePidList();
}
List<IResourcePersistentId> ids =
content.stream().map(t -> JpaPid.fromId((Long) t[0])).collect(Collectors.toList());
List<String> types = content.stream().map(t -> (String) t[1]).collect(Collectors.toList());
Date lastDate = (Date) content.get(content.size() - 1)[2];
return new MixedResourcePidList(types, ids, lastDate, theRequestPartitionId);
// this matches idx_res_type_del_updated
searchParamMap.setSort(new SortSpec(Constants.PARAM_LASTUPDATED).setChain(new SortSpec(Constants.PARAM_PID)));
// TODO this limits us to 2G resources.
searchParamMap.setLoadSynchronousUpTo(Integer.MAX_VALUE);
return searchParamMap;
}
}

View File

@ -339,7 +339,7 @@ public class SearchBuilder implements ISearchBuilder<JpaPid> {
@SuppressWarnings("ConstantConditions")
@Override
public IResultIterator createQuery(
public IResultIterator<JpaPid> createQuery(
SearchParameterMap theParams,
SearchRuntimeDetails theSearchRuntimeDetails,
RequestDetails theRequest,

View File

@ -23,7 +23,6 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.Search;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
public interface ISearchCacheSvc {
@ -88,9 +87,4 @@ public interface ISearchCacheSvc {
* and deleting them.
*/
void pollForStaleSearchesAndDeleteThem(RequestPartitionId theRequestPartitionId, Instant theDeadline);
@Deprecated(since = "6.10", forRemoval = true) // wipmb delete once cdr merges
default void pollForStaleSearchesAndDeleteThem(RequestPartitionId theRequestPartitionId) {
pollForStaleSearchesAndDeleteThem(theRequestPartitionId, Instant.now().plus(1, ChronoUnit.MINUTES));
}
}

View File

@ -1,3 +1,22 @@
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.term;
import ca.uhn.fhir.context.FhirContext;

View File

@ -24,9 +24,12 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.pid.StreamTemplate;
import ca.uhn.fhir.jpa.api.pid.TypedResourcePid;
import ca.uhn.fhir.jpa.api.pid.TypedResourceStream;
import ca.uhn.fhir.jpa.api.svc.IGoldenResourceSearchSvc;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.mdm.api.MdmConstants;
@ -34,7 +37,6 @@ import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.SortOrderEnum;
import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.util.DateRangeUtil;
@ -42,7 +44,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -56,24 +59,31 @@ public class GoldenResourceSearchSvcImpl implements IGoldenResourceSearchSvc {
@Autowired
private FhirContext myFhirContext;
@Autowired
private IHapiTransactionService myTransactionService;
@Override
@Transactional
public IResourcePidList fetchGoldenResourceIdsPage(
public IResourcePidStream fetchGoldenResourceIdStream(
Date theStart,
Date theEnd,
@Nonnull Integer thePageSize,
@Nullable RequestPartitionId theRequestPartitionId,
@Nonnull String theResourceType) {
return fetchResourceIdsPageWithResourceType(
theStart, theEnd, thePageSize, theResourceType, theRequestPartitionId);
IHapiTransactionService.IExecutionBuilder txSettings =
myTransactionService.withSystemRequest().withRequestPartitionId(theRequestPartitionId);
Supplier<Stream<TypedResourcePid>> streamSupplier =
() -> fetchResourceIdsPageWithResourceType(theStart, theEnd, theResourceType, theRequestPartitionId);
StreamTemplate<TypedResourcePid> streamTemplate =
StreamTemplate.fromSupplier(streamSupplier).withTransactionAdvice(txSettings);
return new TypedResourceStream(theRequestPartitionId, streamTemplate);
}
private IResourcePidList fetchResourceIdsPageWithResourceType(
Date theStart,
Date theEnd,
int thePageSize,
String theResourceType,
RequestPartitionId theRequestPartitionId) {
private Stream<TypedResourcePid> fetchResourceIdsPageWithResourceType(
Date theStart, Date theEnd, String theResourceType, RequestPartitionId theRequestPartitionId) {
RuntimeResourceDefinition def = myFhirContext.getResourceDefinition(theResourceType);
@ -89,15 +99,9 @@ public class GoldenResourceSearchSvcImpl implements IGoldenResourceSearchSvc {
searchParamMap.add(Constants.PARAM_TAG, goldenRecordStatusToken);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceType);
SystemRequestDetails request = new SystemRequestDetails();
request.setRequestPartitionId(theRequestPartitionId);
List<IResourcePersistentId> ids = dao.searchForIds(searchParamMap, request);
SystemRequestDetails request = new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId);
Date lastDate = null;
if (ids.size() > 0) {
lastDate = dao.readByPid(ids.get(ids.size() - 1)).getMeta().getLastUpdated();
}
return new HomogeneousResourcePidList(theResourceType, ids, lastDate, theRequestPartitionId);
return dao.searchForIdStream(searchParamMap, request, null)
.map(pid -> new TypedResourcePid(theResourceType, pid));
}
}

View File

@ -5,9 +5,11 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum;
import ca.uhn.fhir.jpa.api.pid.StreamTemplate;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.dao.BaseStorageDao;
import ca.uhn.fhir.jpa.dao.JpaResourceDao;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.model.entity.NormalizedQuantitySearchLevel;
@ -33,6 +35,7 @@ import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.api.SortOrderEnum;
import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.param.DateParam;
import ca.uhn.fhir.rest.param.DateRangeParam;
@ -119,24 +122,27 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME;
import static ca.uhn.fhir.rest.api.Constants.PARAM_HAS;
@ -165,6 +171,9 @@ import static org.junit.jupiter.api.Assertions.fail;
@SuppressWarnings({"unchecked", "deprecation", "Duplicates"})
public class FhirResourceDaoR4Test extends BaseJpaR4Test {
@Autowired
IHapiTransactionService myHapiTransactionService;
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoR4Test.class);
@AfterEach
@ -4260,6 +4269,30 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
assertThat(actualNameList, contains(namesInAlpha));
}
@Test
void testSearchForStream_carriesTxContext() {
// given
Set<String> createdIds = IntStream.range(1, 5)
.mapToObj(i -> createObservation().getIdPart())
.collect(Collectors.toSet());
SystemRequestDetails request = new SystemRequestDetails();
// call within a tx, but carry the tx definition in the StreamTemplate
StreamTemplate<IResourcePersistentId<?>> streamTemplate =
StreamTemplate.fromSupplier(() -> myObservationDao.searchForIdStream(new SearchParameterMap(), request, null))
.withTransactionAdvice(newTxTemplate());
// does the stream work?
Set<String> ids = streamTemplate.call(stream->
stream.map(typedId->typedId.getId().toString())
.collect(Collectors.toSet()));
assertEquals(ids, createdIds);
}
public static void assertConflictException(String theResourceType, ResourceVersionConflictException e) {
assertThat(e.getMessage(), matchesPattern(
Msg.code(550) + Msg.code(515) + "Unable to delete [a-zA-Z]+/[0-9]+ because at least one resource has a reference to this resource. First reference found was resource " + theResourceType + "/[0-9]+ in path [a-zA-Z]+.[a-zA-Z]+"));

View File

@ -1,14 +1,13 @@
package ca.uhn.fhir.jpa.reindex;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.jupiter.api.BeforeEach;
@ -24,12 +23,10 @@ import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
class Batch2DaoSvcImplTest extends BaseJpaR4Test {
@ -37,49 +34,46 @@ class Batch2DaoSvcImplTest extends BaseJpaR4Test {
private static final Date TOMORROW = toDate(LocalDate.now().plusDays(1));
private static final String URL_PATIENT_EXPUNGE_TRUE = "Patient?_expunge=true";
private static final String PATIENT = "Patient";
private static final int INTERNAL_SYNCHRONOUS_SEARCH_SIZE = 10;
@Autowired
private JpaStorageSettings myJpaStorageSettings;
@Autowired
private MatchUrlService myMatchUrlService;
@Autowired
private IHapiTransactionService myIHapiTransactionService ;
private DaoRegistry mySpiedDaoRegistry;
private IBatch2DaoSvc mySubject;
@BeforeEach
void beforeEach() {
myJpaStorageSettings.setInternalSynchronousSearchSize(INTERNAL_SYNCHRONOUS_SEARCH_SIZE);
mySpiedDaoRegistry = spy(myDaoRegistry);
mySubject = new Batch2DaoSvcImpl(myResourceTableDao, myMatchUrlService, mySpiedDaoRegistry, myFhirContext, myIHapiTransactionService, myJpaStorageSettings);
mySubject = new Batch2DaoSvcImpl(myResourceTableDao, myMatchUrlService, myDaoRegistry, myFhirContext, myIHapiTransactionService);
}
// TODO: LD this test won't work with the nonUrl variant yet: error: No existing transaction found for transaction marked with propagation 'mandatory'
@Test
void fetchResourcesByUrlEmptyUrl() {
final InternalErrorException exception = assertThrows(InternalErrorException.class, () -> mySubject.fetchResourceIdsPage(PREVIOUS_MILLENNIUM, TOMORROW, 800, RequestPartitionId.defaultPartition(), ""));
final InternalErrorException exception =
assertThrows(
InternalErrorException.class,
() -> mySubject.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, RequestPartitionId.defaultPartition(), "")
.visitStream(Stream::toList));
assertEquals("HAPI-2422: this should never happen: URL is missing a '?'", exception.getMessage());
}
@Test
void fetchResourcesByUrlSingleQuestionMark() {
final InternalErrorException exception = assertThrows(InternalErrorException.class, () -> mySubject.fetchResourceIdsPage(PREVIOUS_MILLENNIUM, TOMORROW, 800, RequestPartitionId.defaultPartition(), "?"));
final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> mySubject.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, RequestPartitionId.defaultPartition(), "?").visitStream(Stream::toList));
assertEquals("HAPI-2223: theResourceName must not be blank", exception.getMessage());
assertEquals("theResourceName must not be blank", exception.getMessage());
}
@Test
void fetchResourcesByUrlNonsensicalResource() {
final InternalErrorException exception = assertThrows(InternalErrorException.class, () -> mySubject.fetchResourceIdsPage(PREVIOUS_MILLENNIUM, TOMORROW, 800, RequestPartitionId.defaultPartition(), "Banana?_expunge=true"));
final DataFormatException exception = assertThrows(DataFormatException.class, () -> mySubject.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, RequestPartitionId.defaultPartition(), "Banana?_expunge=true").visitStream(Stream::toList));
assertEquals("HAPI-2223: HAPI-1684: Unknown resource name \"Banana\" (this name is not known in FHIR version \"R4\")", exception.getMessage());
assertEquals("HAPI-1684: Unknown resource name \"Banana\" (this name is not known in FHIR version \"R4\")", exception.getMessage());
}
@ParameterizedTest
@ -89,16 +83,12 @@ class Batch2DaoSvcImplTest extends BaseJpaR4Test {
.mapToObj(num -> createPatient())
.toList();
final IResourcePidList resourcePidList = mySubject.fetchResourceIdsPage(PREVIOUS_MILLENNIUM, TOMORROW, 800, RequestPartitionId.defaultPartition(), URL_PATIENT_EXPUNGE_TRUE);
final IResourcePidStream resourcePidList = mySubject.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, RequestPartitionId.defaultPartition(), URL_PATIENT_EXPUNGE_TRUE);
final List<? extends IIdType> actualPatientIds =
resourcePidList.getTypedResourcePids()
.stream()
.map(typePid -> new IdDt(typePid.resourceType, (Long) typePid.id.getId()))
.toList();
resourcePidList.visitStream(s-> s.map(typePid -> new IdDt(typePid.resourceType, (Long) typePid.id.getId()))
.toList());
assertIdsEqual(patientIds, actualPatientIds);
verify(mySpiedDaoRegistry, times(getExpectedNumOfInvocations(expectedNumResults))).getResourceDao(PATIENT);
}
@ParameterizedTest
@ -109,22 +99,14 @@ class Batch2DaoSvcImplTest extends BaseJpaR4Test {
.mapToObj(num -> createPatient())
.toList();
final IResourcePidList resourcePidList = mySubject.fetchResourceIdsPage(PREVIOUS_MILLENNIUM, TOMORROW, pageSizeWellBelowThreshold, RequestPartitionId.defaultPartition(), null);
final IResourcePidStream resourcePidList = mySubject.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, RequestPartitionId.defaultPartition(), null);
final List<? extends IIdType> actualPatientIds =
resourcePidList.getTypedResourcePids()
.stream()
.map(typePid -> new IdDt(typePid.resourceType, (Long) typePid.id.getId()))
.toList();
resourcePidList.visitStream(s-> s.map(typePid -> new IdDt(typePid.resourceType, (Long) typePid.id.getId()))
.toList());
assertIdsEqual(patientIds, actualPatientIds);
}
private int getExpectedNumOfInvocations(int expectedNumResults) {
final int maxResultsPerQuery = INTERNAL_SYNCHRONOUS_SEARCH_SIZE + 1;
final int division = expectedNumResults / maxResultsPerQuery;
return division + 1;
}
private static void assertIdsEqual(List<IIdType> expectedResourceIds, List<? extends IIdType> actualResourceIds) {
assertEquals(expectedResourceIds.size(), actualResourceIds.size());

View File

@ -1,10 +1,9 @@
package ca.uhn.fhir.jpa.reindex;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.pid.TypedResourcePid;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
@ -12,15 +11,13 @@ import org.springframework.beans.factory.annotation.Autowired;
import java.util.Date;
import java.util.List;
import java.util.stream.Stream;
import static ca.uhn.fhir.batch2.jobs.step.ResourceIdListStep.DEFAULT_PAGE_SIZE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@SuppressWarnings("unchecked")
@TestMethodOrder(value = MethodOrderer.MethodName.class)
public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
@ -55,14 +52,12 @@ public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
IResourcePidList page = mySvc.fetchResourceIdsPage(start, end, DEFAULT_PAGE_SIZE, null, null);
IResourcePidStream queryStream = mySvc.fetchResourceIdStream(start, end, null, null);
// Verify
assertEquals(3, page.size());
assertThat(page.getTypedResourcePids(), contains(new TypedResourcePid("Patient", id0), new TypedResourcePid("Patient", id1), new TypedResourcePid("Observation", id2)));
assertTrue(page.getLastDate().after(beforeLastInRange));
assertTrue(page.getLastDate().before(end));
List<TypedResourcePid> typedPids = queryStream.visitStream(Stream::toList);
assertEquals(3, typedPids.size());
assertThat(typedPids, contains(new TypedResourcePid("Patient", id0), new TypedResourcePid("Patient", id1), new TypedResourcePid("Observation", id2)));
assertEquals(1, myCaptureQueriesListener.logSelectQueries().size());
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
@ -85,13 +80,12 @@ public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
IResourcePidList page = mySvc.fetchResourceIdsPage(start, end, DEFAULT_PAGE_SIZE, null, null);
IResourcePidStream queryStream = mySvc.fetchResourceIdStream(start, end, null, null);
// Verify
List<TypedResourcePid> typedPids = queryStream.visitStream(Stream::toList);
assertTrue(page.isEmpty());
assertEquals(0, page.size());
assertNull(page.getLastDate());
assertTrue(typedPids.isEmpty());
assertEquals(1, myCaptureQueriesListener.logSelectQueries().size());
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
@ -133,19 +127,16 @@ public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
IResourcePidList page = mySvc.fetchResourceIdsPage(start, end, DEFAULT_PAGE_SIZE, null, "Patient?active=false");
IResourcePidStream queryStream = mySvc.fetchResourceIdStream(start, end, null, "Patient?active=false");
// Verify
List<TypedResourcePid> typedResourcePids = queryStream.visitStream(Stream::toList);
assertEquals(4, page.size());
List<TypedResourcePid> typedResourcePids = page.getTypedResourcePids();
assertThat(page.getTypedResourcePids(),
contains(new TypedResourcePid("Patient", patientId0),
assertEquals(2, typedResourcePids.size());
assertThat(typedResourcePids,
contains(
new TypedResourcePid("Patient", patientId1),
new TypedResourcePid("Patient", patientId2),
new TypedResourcePid("Patient", patientId3)));
assertTrue(page.getLastDate().after(beforeLastInRange));
assertTrue(page.getLastDate().before(end) || page.getLastDate().equals(end));
new TypedResourcePid("Patient", patientId2)));
assertEquals(1, myCaptureQueriesListener.logSelectQueries().size());
assertEquals(0, myCaptureQueriesListener.countInsertQueries());

View File

@ -38,13 +38,15 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
@SuppressWarnings("SqlSourceToSinkFlow")
public class ConnectionWrapper implements Connection {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(ConnectionWrapper.class);
private Connection myWrap;
private final Connection myWrap;
public ConnectionWrapper(Connection theConnection) {
ourLog.trace("new connection - {}", theConnection);
myWrap = theConnection;
}
@ -60,6 +62,7 @@ public class ConnectionWrapper implements Connection {
@Override
public void close() throws SQLException {
ourLog.trace("close connection - {}", myWrap);
myWrap.close();
}

View File

@ -21,10 +21,9 @@ package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import java.util.Date;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
@ -35,19 +34,6 @@ import javax.annotation.Nullable;
* @param <IT> This parameter defines constraints on the types of pids we are pulling (e.g. resource type, url, etc).
*/
public interface IIdChunkProducer<IT extends ChunkRangeJson> {
/**
* Actually fetch the resource pids
* @param theNextStart pids are pulled with lastUpdated >= this date
* @param theEnd pids are pulled with lastUpdate <= this date
* @param thePageSize the number of pids to query at a time
* @param theRequestPartitionId partition for operation if rtequired
* @param theData defines the query we are using
* @return a list of Resource pids
*/
IResourcePidList fetchResourceIdsPage(
Date theNextStart,
Date theEnd,
@Nonnull Integer thePageSize,
@Nullable RequestPartitionId theRequestPartitionId,
IT theData);
IResourcePidStream fetchResourceIdStream(
Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, IT theData);
}

View File

@ -22,15 +22,16 @@ package ca.uhn.fhir.batch2.jobs.step;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.util.Logs;
import org.slf4j.Logger;
import java.util.Date;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class PartitionedUrlListIdChunkProducer implements IIdChunkProducer<PartitionedUrlChunkRangeJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final IBatch2DaoSvc myBatch2DaoSvc;
@ -40,29 +41,27 @@ public class PartitionedUrlListIdChunkProducer implements IIdChunkProducer<Parti
}
@Override
public IResourcePidList fetchResourceIdsPage(
Date theNextStart,
public IResourcePidStream fetchResourceIdStream(
Date theStart,
Date theEnd,
@Nonnull Integer thePageSize,
@Nullable RequestPartitionId theRequestPartitionId,
PartitionedUrlChunkRangeJson theData) {
PartitionedUrl partitionedUrl = theData.getPartitionedUrl();
RequestPartitionId targetPartitionId;
String theUrl;
if (partitionedUrl == null) {
ourLog.info("Fetching resource ID chunk for everything - Range {} - {}", theNextStart, theEnd);
return myBatch2DaoSvc.fetchResourceIdsPage(theNextStart, theEnd, thePageSize, theRequestPartitionId, null);
theUrl = null;
targetPartitionId = theRequestPartitionId;
ourLog.info("Fetching resource ID chunk for everything - Range {} - {}", theStart, theEnd);
} else {
theUrl = partitionedUrl.getUrl();
targetPartitionId = defaultIfNull(partitionedUrl.getRequestPartitionId(), theRequestPartitionId);
ourLog.info(
"Fetching resource ID chunk for URL {} - Range {} - {}",
partitionedUrl.getUrl(),
theNextStart,
theEnd);
RequestPartitionId requestPartitionId = partitionedUrl.getRequestPartitionId();
if (requestPartitionId == null) {
requestPartitionId = theRequestPartitionId;
}
return myBatch2DaoSvc.fetchResourceIdsPage(
theNextStart, theEnd, thePageSize, requestPartitionId, partitionedUrl.getUrl());
"Fetching resource ID chunk for URL {} - Range {} - {}", partitionedUrl.getUrl(), theStart, theEnd);
}
return myBatch2DaoSvc.fetchResourceIdStream(theStart, theEnd, targetPartitionId, theUrl);
}
}

View File

@ -29,20 +29,19 @@ import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedJobParameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.util.Logs;
import com.google.common.collect.Iterators;
import com.google.common.collect.UnmodifiableIterator;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import static ca.uhn.fhir.util.StreamUtil.partition;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class ResourceIdListStep<PT extends PartitionedJobParameters, IT extends ChunkRangeJson>
implements IJobStepWorker<PT, IT, ResourceIdListWorkChunkJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@ -67,49 +66,32 @@ public class ResourceIdListStep<PT extends PartitionedJobParameters, IT extends
Date start = data.getStart();
Date end = data.getEnd();
Integer batchSize = theStepExecutionDetails.getParameters().getBatchSize();
int pageSize = DEFAULT_PAGE_SIZE;
if (batchSize != null) {
pageSize = batchSize.intValue();
}
ourLog.info("Beginning scan for reindex IDs in range {} to {}", start, end);
RequestPartitionId requestPartitionId =
theStepExecutionDetails.getParameters().getRequestPartitionId();
int totalIdsFound = 0;
int chunkCount = 0;
int maxBatchId = MAX_BATCH_OF_IDS;
if (batchSize != null) {
// we won't go over MAX_BATCH_OF_IDS
maxBatchId = Math.min(batchSize.intValue(), maxBatchId);
}
int chunkSize = Math.min(defaultIfNull(batchSize, MAX_BATCH_OF_IDS), MAX_BATCH_OF_IDS);
final IResourcePidList nextChunk = myIdChunkProducer.fetchResourceIdsPage(
start, end, pageSize, requestPartitionId, theStepExecutionDetails.getData());
final IResourcePidStream searchResult = myIdChunkProducer.fetchResourceIdStream(
start, end, requestPartitionId, theStepExecutionDetails.getData());
if (nextChunk.isEmpty()) {
ourLog.info("No data returned");
} else {
ourLog.debug("Found {} IDs from {} to {}", nextChunk.size(), start, nextChunk.getLastDate());
searchResult.visitStreamNoResult(typedResourcePidStream -> {
AtomicInteger totalIdsFound = new AtomicInteger();
AtomicInteger chunkCount = new AtomicInteger();
final Set<TypedPidJson> idBuffer = nextChunk.getTypedResourcePids().stream()
.map(TypedPidJson::new)
.collect(Collectors.toCollection(LinkedHashSet::new));
final UnmodifiableIterator<List<TypedPidJson>> partition =
Iterators.partition(idBuffer.iterator(), maxBatchId);
while (partition.hasNext()) {
final List<TypedPidJson> submissionIds = partition.next();
totalIdsFound += submissionIds.size();
chunkCount++;
submitWorkChunk(submissionIds, nextChunk.getRequestPartitionId(), theDataSink);
}
Stream<TypedPidJson> jsonStream = typedResourcePidStream.map(TypedPidJson::new);
// chunk by size maxBatchId and submit the batches
partition(jsonStream, chunkSize).forEach(idBatch -> {
totalIdsFound.addAndGet(idBatch.size());
chunkCount.getAndIncrement();
submitWorkChunk(idBatch, searchResult.getRequestPartitionId(), theDataSink);
});
ourLog.info("Submitted {} chunks with {} resource IDs", chunkCount, totalIdsFound);
}
});
return RunOutcome.SUCCESS;
}

View File

@ -8,6 +8,8 @@ import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.pid.ListWrappingPidStream;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
@ -25,7 +27,6 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import static ca.uhn.fhir.batch2.jobs.step.ResourceIdListStep.DEFAULT_PAGE_SIZE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
@ -68,7 +69,7 @@ public class LoadIdsStepTest {
// First Execution
when(myBatch2DaoSvc.fetchResourceIdsPage(eq(DATE_1), eq(DATE_END), eq(DEFAULT_PAGE_SIZE), isNull(), isNull()))
when(myBatch2DaoSvc.fetchResourceIdStream(eq(DATE_1), eq(DATE_END), isNull(), isNull()))
.thenReturn(createIdChunk(0L, 20000L, DATE_2));
mySvc.run(details, mySink);
@ -96,14 +97,14 @@ public class LoadIdsStepTest {
}
@Nonnull
private IResourcePidList createIdChunk(long idLow, long idHigh, Date lastDate) {
private IResourcePidStream createIdChunk(long idLow, long idHigh, Date lastDate) {
List<IResourcePersistentId> ids = new ArrayList<>();
List<String> resourceTypes = new ArrayList<>();
for (long i = idLow; i < idHigh; i++) {
ids.add(JpaPid.fromId(i));
}
IResourcePidList chunk = new HomogeneousResourcePidList("Patient", ids, lastDate, null);
return chunk;
return new ListWrappingPidStream(chunk);
}
}

View File

@ -7,7 +7,8 @@ import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
import ca.uhn.fhir.jpa.api.pid.TypedResourcePid;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.pid.ListWrappingPidStream;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@ -20,7 +21,6 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -58,15 +58,13 @@ class ResourceIdListStepTest {
@ParameterizedTest
@ValueSource(ints = {0, 1, 100, 500, 501, 2345, 10500})
void testResourceIdListBatchSizeLimit(int theListSize) {
List<TypedResourcePid> idList = generateIdList(theListSize);
List<IResourcePersistentId> idList = generateIdList(theListSize);
when(myStepExecutionDetails.getData()).thenReturn(myData);
when(myParameters.getBatchSize()).thenReturn(theListSize);
when(myParameters.getBatchSize()).thenReturn(500);
when(myStepExecutionDetails.getParameters()).thenReturn(myParameters);
HomogeneousResourcePidList homogeneousResourcePidList = mock(HomogeneousResourcePidList.class);
IResourcePidStream mockStream = new ListWrappingPidStream(
new HomogeneousResourcePidList("Patient", idList, null, null));
if (theListSize > 0) {
when(homogeneousResourcePidList.getTypedResourcePids()).thenReturn(idList);
when(homogeneousResourcePidList.getLastDate()).thenReturn(new Date());
when(homogeneousResourcePidList.isEmpty()).thenReturn(false);
// Ensure none of the work chunks exceed MAX_BATCH_OF_IDS in size:
doAnswer(i -> {
ResourceIdListWorkChunkJson list = i.getArgument(0);
@ -74,12 +72,9 @@ class ResourceIdListStepTest {
"Id batch size should never exceed " + ResourceIdListStep.MAX_BATCH_OF_IDS);
return null;
}).when(myDataSink).accept(any(ResourceIdListWorkChunkJson.class));
} else {
when(homogeneousResourcePidList.isEmpty()).thenReturn(true);
}
when(myIdChunkProducer.fetchResourceIdsPage(any(), any(), any(), any(), any()))
.thenReturn(homogeneousResourcePidList);
when(myIdChunkProducer.fetchResourceIdStream(any(), any(), any(), any()))
.thenReturn(mockStream);
final RunOutcome run = myResourceIdListStep.run(myStepExecutionDetails, myDataSink);
assertNotEquals(null, run);
@ -103,13 +98,12 @@ class ResourceIdListStepTest {
}
}
private List<TypedResourcePid> generateIdList(int theListSize) {
List<TypedResourcePid> idList = new ArrayList<>();
private List<IResourcePersistentId> generateIdList(int theListSize) {
List<IResourcePersistentId> idList = new ArrayList<>();
for (int id = 0; id < theListSize; id++) {
IResourcePersistentId theId = mock(IResourcePersistentId.class);
IResourcePersistentId<?> theId = mock(IResourcePersistentId.class);
when(theId.toString()).thenReturn(Integer.toString(id + 1));
TypedResourcePid typedId = new TypedResourcePid("Patient", theId);
idList.add(typedId);
idList.add(theId);
}
return idList;
}

View File

@ -21,13 +21,13 @@ package ca.uhn.fhir.mdm.batch2;
import ca.uhn.fhir.batch2.jobs.step.IIdChunkProducer;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.svc.IGoldenResourceSearchSvc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
public class MdmIdChunkProducer implements IIdChunkProducer<MdmChunkRangeJson> {
private static final Logger ourLog = LoggerFactory.getLogger(MdmIdChunkProducer.class);
@ -38,21 +38,17 @@ public class MdmIdChunkProducer implements IIdChunkProducer<MdmChunkRangeJson> {
}
@Override
public IResourcePidList fetchResourceIdsPage(
Date theNextStart,
Date theEnd,
@Nonnull Integer thePageSize,
RequestPartitionId theRequestPartitionId,
MdmChunkRangeJson theData) {
public IResourcePidStream fetchResourceIdStream(
Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, MdmChunkRangeJson theData) {
String resourceType = theData.getResourceType();
ourLog.info(
"Fetching golden resource ID chunk for resource type {} - Range {} - {}",
resourceType,
theNextStart,
theStart,
theEnd);
return myGoldenResourceSearchSvc.fetchGoldenResourceIdsPage(
theNextStart, theEnd, thePageSize, theRequestPartitionId, resourceType);
return myGoldenResourceSearchSvc.fetchGoldenResourceIdStream(
theStart, theEnd, theRequestPartitionId, resourceType);
}
}

View File

@ -51,6 +51,7 @@ import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletResponse;
@ -356,6 +357,22 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
return searchForIds(theParams, theRequest);
}
/**
* Search results matching theParams.
* The Stream MUST be called within a transaction because the stream wraps an open query ResultSet.
* The Stream MUST be closed to avoid leaking resources.
* @param theParams the search
* @param theRequest for partition target info
* @return a Stream than MUST only be used within the calling transaction.
*/
default <PID extends IResourcePersistentId<?>> Stream<PID> searchForIdStream(
SearchParameterMap theParams,
RequestDetails theRequest,
@Nullable IBaseResource theConditionalOperationTargetOrNull) {
List<PID> iResourcePersistentIds = searchForIds(theParams, theRequest);
return iResourcePersistentIds.stream();
}
/**
* Takes a map of incoming raw search parameters and translates/parses them into
* appropriate {@link IQueryParameterType} instances of the appropriate type

View File

@ -0,0 +1,45 @@
/*-
* #%L
* HAPI FHIR Storage api
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.api.pid;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
* Template for wrapping access to stream supplier in a try-with-resources block.
*/
class AutoClosingStreamTemplate<T> implements StreamTemplate<T> {
private final Supplier<Stream<T>> myStreamQuery;
AutoClosingStreamTemplate(Supplier<Stream<T>> theStreamQuery) {
myStreamQuery = theStreamQuery;
}
@Nullable
@Override
public <R> R call(@Nonnull Function<Stream<T>, R> theCallback) {
try (Stream<T> stream = myStreamQuery.get()) {
return theCallback.apply(stream);
}
}
}

View File

@ -22,7 +22,11 @@ package ca.uhn.fhir.jpa.api.pid;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -36,7 +40,9 @@ public abstract class BaseResourcePidList implements IResourcePidList {
private final RequestPartitionId myRequestPartitionId;
BaseResourcePidList(
Collection<IResourcePersistentId> theIds, Date theLastDate, RequestPartitionId theRequestPartitionId) {
Collection<? extends IResourcePersistentId> theIds,
Date theLastDate,
RequestPartitionId theRequestPartitionId) {
myIds.addAll(theIds);
myLastDate = theLastDate;
myRequestPartitionId = theRequestPartitionId;

View File

@ -0,0 +1,45 @@
/*-
* #%L
* HAPI FHIR Storage api
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.api.pid;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
/**
* Wrapper for a query result stream.
*/
public interface IResourcePidStream {
<T> T visitStream(Function<Stream<TypedResourcePid>, T> theCallback);
default void visitStreamNoResult(Consumer<Stream<TypedResourcePid>> theCallback) {
visitStream(theStream -> {
theCallback.accept(theStream);
return null;
});
}
/**
* The partition info for the query.
*/
RequestPartitionId getRequestPartitionId();
}

View File

@ -0,0 +1,47 @@
/*-
* #%L
* HAPI FHIR Storage api
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.api.pid;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import java.util.function.Function;
import java.util.stream.Stream;
public class ListWrappingPidStream implements IResourcePidStream {
private final IResourcePidList myList;
public ListWrappingPidStream(IResourcePidList theList) {
myList = theList;
}
public Stream<TypedResourcePid> getTypedResourcePidStream() {
return myList.getTypedResourcePids().stream();
}
@Override
public <T> T visitStream(Function<Stream<TypedResourcePid>, T> theCallback) {
return theCallback.apply(getTypedResourcePidStream());
}
@Override
public RequestPartitionId getRequestPartitionId() {
return myList.getRequestPartitionId();
}
}

View File

@ -36,7 +36,7 @@ public class MixedResourcePidList extends BaseResourcePidList {
public MixedResourcePidList(
List<String> theResourceTypes,
Collection<IResourcePersistentId> theIds,
Collection<? extends IResourcePersistentId> theIds,
Date theLastDate,
RequestPartitionId theRequestPartitionId) {
super(theIds, theLastDate, theRequestPartitionId);

View File

@ -0,0 +1,60 @@
/*-
* #%L
* HAPI FHIR Storage api
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.api.pid;
import org.springframework.transaction.support.TransactionOperations;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
* A template for stream queries, like JDBCTemplate and friends.
*
* We need to wrap access to the stream with a tx-span, a try-with-resources block, and RequestDetails.
* @param <T> The stream content type
*/
public interface StreamTemplate<T> {
@Nullable
<R> R call(@Nonnull Function<Stream<T>, R> theCallback);
/**
* Wrap this template with a transaction boundary.
* Our dao Stream methods require an active Hibernate session for the duration of the Stream.
* This advice uses a tx boundary to ensure that active session.
*
* @param theTxBuilder the transaction and partition settings
* @return the wrapped template
*/
default StreamTemplate<T> withTransactionAdvice(TransactionOperations theTxBuilder) {
return new TransactionWrappingStreamTemplate<>(theTxBuilder, this);
}
/**
* Wrap the supplied stream as a StreamTemplate in a try-with-resources block to ensure it is closed.
* @param theStreamQuery the query to run
* @return a template that will always close the Stream on exit.
*/
static <ST> StreamTemplate<ST> fromSupplier(Supplier<Stream<ST>> theStreamQuery) {
return new AutoClosingStreamTemplate<>(theStreamQuery);
}
}

View File

@ -0,0 +1,52 @@
/*-
* #%L
* HAPI FHIR Storage api
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.api.pid;
import org.springframework.transaction.support.TransactionOperations;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
* Wrap a StreamTemplate with transaction advice.
* We can't cary open ResultSets past a transaction boundary.
* This wraps a Stream producer with tx advice so the connection is still open.
*/
class TransactionWrappingStreamTemplate<T> implements StreamTemplate<T> {
@Nonnull
final TransactionOperations myTransaction;
@Nonnull
final StreamTemplate<T> myWrappedStreamTemplate;
TransactionWrappingStreamTemplate(
@Nonnull TransactionOperations theTransaction, @Nonnull StreamTemplate<T> theWrappedStreamTemplate) {
myTransaction = theTransaction;
this.myWrappedStreamTemplate = theWrappedStreamTemplate;
}
@Nullable
@Override
public <R> R call(@Nonnull Function<Stream<T>, R> theCallback) {
return myTransaction.execute(unusedTxStatus -> myWrappedStreamTemplate.call(theCallback));
}
}

View File

@ -0,0 +1,47 @@
/*-
* #%L
* HAPI FHIR Storage api
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.api.pid;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import java.util.function.Function;
import java.util.stream.Stream;
public class TypedResourceStream implements IResourcePidStream {
private final RequestPartitionId myRequestPartitionId;
private final StreamTemplate<TypedResourcePid> myStreamSupplier;
public TypedResourceStream(
RequestPartitionId theRequestPartitionId, StreamTemplate<TypedResourcePid> theStreamSupplier) {
myRequestPartitionId = theRequestPartitionId;
myStreamSupplier = theStreamSupplier;
}
@Override
public <T> T visitStream(Function<Stream<TypedResourcePid>, T> theCallback) {
return myStreamSupplier.call(theCallback);
}
@Override
public RequestPartitionId getRequestPartitionId() {
return myRequestPartitionId;
}
}

View File

@ -22,6 +22,8 @@ package ca.uhn.fhir.jpa.api.svc;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.pid.ListWrappingPidStream;
import java.util.Date;
import javax.annotation.Nonnull;
@ -68,4 +70,10 @@ public interface IBatch2DaoSvc {
@Nullable String theUrl) {
return fetchResourceIdsPage(theStart, theEnd, theRequestPartitionId, theUrl);
}
default IResourcePidStream fetchResourceIdStream(
Date theStart, Date theEnd, RequestPartitionId theTargetPartitionId, String theUrl) {
return new ListWrappingPidStream(fetchResourceIdsPage(
theStart, theEnd, 20000 /* ResourceIdListStep.DEFAULT_PAGE_SIZE */, theTargetPartitionId, theUrl));
}
}

View File

@ -20,7 +20,7 @@
package ca.uhn.fhir.jpa.api.svc;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import java.util.Date;
import javax.annotation.Nonnull;
@ -29,18 +29,16 @@ import javax.annotation.Nullable;
public interface IGoldenResourceSearchSvc {
/**
* Fetches a page of resource IDs for golden resources of the given type. The page size is up to the discretion of the implementation.
* Fetches a cursor of resource IDs for golden resources of the given type.
*
* @param theStart The start of the date range, must be inclusive.
* @param theEnd The end of the date range, should be exclusive.
* @param thePageSize The number of golden resources to request at a time.
* @param theRequestPartitionId The request partition ID (may be <code>null</code> on nonpartitioned systems)
* @param theResourceType the type of resource.
*/
IResourcePidList fetchGoldenResourceIdsPage(
IResourcePidStream fetchGoldenResourceIdStream(
Date theStart,
Date theEnd,
@Nonnull Integer thePageSize,
@Nullable RequestPartitionId theRequestPartitionId,
@Nullable String theResourceType);
@Nonnull String theResourceType);
}

View File

@ -36,7 +36,7 @@ import java.util.Set;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
public interface ISearchBuilder<T extends IResourcePersistentId> {
public interface ISearchBuilder<T extends IResourcePersistentId<?>> {
String SEARCH_BUILDER_BEAN_NAME = "SearchBuilder";
IResultIterator createQuery(

View File

@ -53,6 +53,7 @@ import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionOperations;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
@ -402,7 +403,7 @@ public class HapiTransactionService implements IHapiTransactionService {
}
}
protected class ExecutionBuilder implements IExecutionBuilder {
protected class ExecutionBuilder implements IExecutionBuilder, TransactionOperations {
private final RequestDetails myRequestDetails;
private Isolation myIsolation;
@ -473,7 +474,7 @@ public class HapiTransactionService implements IHapiTransactionService {
}
@Override
public <T> T execute(TransactionCallback<T> callback) {
public <T> T execute(@Nonnull TransactionCallback<T> callback) {
assert callback != null;
return doExecute(this, callback);

View File

@ -26,6 +26,7 @@ import ca.uhn.fhir.util.ICallable;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionOperations;
import java.util.concurrent.Callable;
import javax.annotation.Nonnull;
@ -80,7 +81,7 @@ public interface IHapiTransactionService {
@Nonnull Isolation theIsolation,
@Nonnull ICallable<T> theCallback);
interface IExecutionBuilder {
interface IExecutionBuilder extends TransactionOperations {
IExecutionBuilder withIsolation(Isolation theIsolation);
@ -98,6 +99,6 @@ public interface IHapiTransactionService {
<T> T execute(Callable<T> theTask);
<T> T execute(TransactionCallback<T> callback);
<T> T execute(@Nonnull TransactionCallback<T> callback);
}
}

View File

@ -0,0 +1,65 @@
package ca.uhn.fhir.jpa.api.pid;
import org.junit.jupiter.api.Test;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.*;
class AutoClosingStreamTemplateTest {
@Test
void templatePassesStreamToCallback() {
// given
Stream<String> concreteStream = Stream.of("one", "two");
StreamTemplate<String> streamTemplate = StreamTemplate.fromSupplier(() -> concreteStream);
// when
streamTemplate.call(s -> {
assertSame(concreteStream, s);
return 0;
});
}
@Test
void templateClosesStreamOnExit() {
// given
AtomicBoolean wasClosed = new AtomicBoolean(false);
Stream<String> concreteStream = Stream.of("one", "two")
.onClose(()->wasClosed.set(true));
StreamTemplate<String> streamTemplate = StreamTemplate.fromSupplier(() -> concreteStream);
// when
streamTemplate.call(s -> {
// don't touch the stream;
return 0;
});
assertTrue(wasClosed.get(), "stream was closed");
}
@Test
void templateClosesStreamOnException() {
// given
AtomicBoolean wasClosed = new AtomicBoolean(false);
Stream<String> concreteStream = Stream.of("one", "two")
.onClose(()->wasClosed.set(true));
StreamTemplate<String> streamTemplate = StreamTemplate.fromSupplier(() -> concreteStream);
// when
try {
streamTemplate.call(s -> {
throw new RuntimeException("something failed");
});
} catch (RuntimeException e) {
// expected;
}
assertTrue(wasClosed.get(), "stream was closed");
}
}