Merge branch 'master' into fix-batch-pool-breaking

This commit is contained in:
Tadgh 2021-09-09 11:28:44 -04:00
commit d5f3b5db18
16 changed files with 299 additions and 173 deletions

View File

@ -24,18 +24,23 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails; import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.QueryChunker; import ca.uhn.fhir.jpa.util.QueryChunker;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
@ -52,17 +57,19 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc {
DaoRegistry myDaoRegistry; DaoRegistry myDaoRegistry;
@Autowired @Autowired
IResourceTableDao myResourceTableDao; IResourceTableDao myResourceTableDao;
@Autowired
IdHelperService myIdHelperService;
@Override @Override
@Nonnull @Nonnull
public ResourceVersionMap getVersionMap(String theResourceName, SearchParameterMap theSearchParamMap) { public ResourceVersionMap getVersionMap(RequestPartitionId theRequestPartitionId, String theResourceName, SearchParameterMap theSearchParamMap) {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceName); IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceName);
if (ourLog.isDebugEnabled()) { if (ourLog.isDebugEnabled()) {
ourLog.debug("About to retrieve version map for resource type: {}", theResourceName); ourLog.debug("About to retrieve version map for resource type: {}", theResourceName);
} }
List<Long> matchingIds = dao.searchForIds(theSearchParamMap, new SystemRequestDetails().setRequestPartitionId(RequestPartitionId.allPartitions())).stream() List<Long> matchingIds = dao.searchForIds(theSearchParamMap, new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId)).stream()
.map(ResourcePersistentId::getIdAsLong) .map(ResourcePersistentId::getIdAsLong)
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -74,4 +81,95 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc {
return ResourceVersionMap.fromResourceTableEntities(allById); return ResourceVersionMap.fromResourceTableEntities(allById);
} }
@Override
/**
* Retrieves the latest versions for any resourceid that are found.
* If they are not found, they will not be contained in the returned map.
* The key should be the same value that was passed in to allow
* consumer to look up the value using the id they already have.
*
* This method should not throw, so it can safely be consumed in
* transactions.
*
* @param theRequestPartitionId - request partition id
* @param theIds - list of IIdTypes for resources of interest.
* @return
*/
public ResourcePersistentIdMap getLatestVersionIdsForResourceIds(RequestPartitionId theRequestPartitionId, List<IIdType> theIds) {
ResourcePersistentIdMap idToPID = new ResourcePersistentIdMap();
HashMap<String, List<IIdType>> resourceTypeToIds = new HashMap<>();
for (IIdType id : theIds) {
String resourceType = id.getResourceType();
if (!resourceTypeToIds.containsKey(resourceType)) {
resourceTypeToIds.put(resourceType, new ArrayList<>());
}
resourceTypeToIds.get(resourceType).add(id);
}
for (String resourceType : resourceTypeToIds.keySet()) {
ResourcePersistentIdMap idAndPID = getIdsOfExistingResources(theRequestPartitionId,
resourceTypeToIds.get(resourceType));
idToPID.putAll(idAndPID);
}
return idToPID;
}
/**
* Helper method to determine if some resources exist in the DB (without throwing).
* Returns a set that contains the IIdType for every resource found.
* If it's not found, it won't be included in the set.
*
* @param theIds - list of IIdType ids (for the same resource)
* @return
*/
private ResourcePersistentIdMap getIdsOfExistingResources(RequestPartitionId thePartitionId,
Collection<IIdType> theIds) {
// these are the found Ids that were in the db
ResourcePersistentIdMap retval = new ResourcePersistentIdMap();
if (theIds == null || theIds.isEmpty()) {
return retval;
}
List<ResourcePersistentId> resourcePersistentIds = myIdHelperService.resolveResourcePersistentIdsWithCache(thePartitionId,
theIds.stream().collect(Collectors.toList()));
// we'll use this map to fetch pids that require versions
HashMap<Long, ResourcePersistentId> pidsToVersionToResourcePid = new HashMap<>();
// fill in our map
for (ResourcePersistentId pid : resourcePersistentIds) {
if (pid.getVersion() == null) {
pidsToVersionToResourcePid.put(pid.getIdAsLong(), pid);
}
Optional<IIdType> idOp = theIds.stream()
.filter(i -> i.getIdPart().equals(pid.getAssociatedResourceId().getIdPart()))
.findFirst();
// this should always be present
// since it was passed in.
// but land of optionals...
idOp.ifPresent(id -> {
retval.put(id, pid);
});
}
// set any versions we don't already have
if (!pidsToVersionToResourcePid.isEmpty()) {
Collection<Object[]> resourceEntries = myResourceTableDao
.getResourceVersionsForPid(new ArrayList<>(pidsToVersionToResourcePid.keySet()));
for (Object[] record : resourceEntries) {
// order matters!
Long retPid = (Long) record[0];
String resType = (String) record[1];
Long version = (Long) record[2];
pidsToVersionToResourcePid.get(retPid).setVersion(version);
}
}
return retval;
}
} }

View File

@ -30,7 +30,8 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.api.model.LazyDaoMethodOutcome; import ca.uhn.fhir.jpa.api.model.LazyDaoMethodOutcome;
import ca.uhn.fhir.jpa.dao.index.IdHelperService; import ca.uhn.fhir.jpa.cache.IResourceVersionSvc;
import ca.uhn.fhir.jpa.cache.ResourcePersistentIdMap;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource; import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
import ca.uhn.fhir.jpa.model.entity.ModelConfig; import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.entity.ResourceTable;
@ -94,7 +95,7 @@ public abstract class BaseStorageDao {
@Autowired @Autowired
protected ModelConfig myModelConfig; protected ModelConfig myModelConfig;
@Autowired @Autowired
protected IdHelperService myIdHelperService; protected IResourceVersionSvc myResourceVersionSvc;
@Autowired @Autowired
protected DaoConfig myDaoConfig; protected DaoConfig myDaoConfig;
@ -211,7 +212,7 @@ public abstract class BaseStorageDao {
IIdType referenceElement = nextReference.getReferenceElement(); IIdType referenceElement = nextReference.getReferenceElement();
if (!referenceElement.hasBaseUrl()) { if (!referenceElement.hasBaseUrl()) {
Map<IIdType, ResourcePersistentId> idToPID = myIdHelperService.getLatestVersionIdsForResourceIds(RequestPartitionId.allPartitions(), ResourcePersistentIdMap resourceVersionMap = myResourceVersionSvc.getLatestVersionIdsForResourceIds(RequestPartitionId.allPartitions(),
Collections.singletonList(referenceElement) Collections.singletonList(referenceElement)
); );
@ -220,12 +221,11 @@ public abstract class BaseStorageDao {
// 2) no resource exists, but we will create one (eventually). The version is 1 // 2) no resource exists, but we will create one (eventually). The version is 1
// 3) no resource exists, and none will be made -> throw // 3) no resource exists, and none will be made -> throw
Long version; Long version;
if (idToPID.containsKey(referenceElement)) { if (resourceVersionMap.containsKey(referenceElement)) {
// the resource exists... latest id // the resource exists... latest id
// will be the value in the ResourcePersistentId // will be the value in the ResourcePersistentId
version = idToPID.get(referenceElement).getVersion(); version = resourceVersionMap.getResourcePersistentId(referenceElement).getVersion();
} } else if (myDaoConfig.isAutoCreatePlaceholderReferenceTargets()) {
else if (myDaoConfig.isAutoCreatePlaceholderReferenceTargets()) {
// if idToPID doesn't contain object // if idToPID doesn't contain object
// but autcreateplaceholders is on // but autcreateplaceholders is on
// then the version will be 1 (the first version) // then the version will be 1 (the first version)

View File

@ -35,7 +35,8 @@ import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.api.model.DeleteConflict; import ca.uhn.fhir.jpa.api.model.DeleteConflict;
import ca.uhn.fhir.jpa.api.model.DeleteConflictList; import ca.uhn.fhir.jpa.api.model.DeleteConflictList;
import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome; import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome;
import ca.uhn.fhir.jpa.dao.index.IdHelperService; import ca.uhn.fhir.jpa.cache.IResourceVersionSvc;
import ca.uhn.fhir.jpa.cache.ResourcePersistentIdMap;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.delete.DeleteConflictService; import ca.uhn.fhir.jpa.delete.DeleteConflictService;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource; import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
@ -54,7 +55,6 @@ import ca.uhn.fhir.rest.api.PreferReturnEnum;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.DeferredInterceptorBroadcasts; import ca.uhn.fhir.rest.api.server.storage.DeferredInterceptorBroadcasts;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.param.ParameterUtil; import ca.uhn.fhir.rest.param.ParameterUtil;
import ca.uhn.fhir.rest.server.RestfulServerUtils; import ca.uhn.fhir.rest.server.RestfulServerUtils;
@ -80,7 +80,6 @@ import ca.uhn.fhir.util.UrlUtil;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap; import com.google.common.collect.ListMultimap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.hl7.fhir.dstu3.model.Bundle; import org.hl7.fhir.dstu3.model.Bundle;
import org.hl7.fhir.exceptions.FHIRException; import org.hl7.fhir.exceptions.FHIRException;
@ -116,7 +115,6 @@ import java.util.IdentityHashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -162,7 +160,7 @@ public abstract class BaseTransactionProcessor {
private TaskExecutor myExecutor ; private TaskExecutor myExecutor ;
@Autowired @Autowired
private IdHelperService myIdHelperService; private IResourceVersionSvc myResourceVersionSvc;
@VisibleForTesting @VisibleForTesting
public void setDaoConfig(DaoConfig theDaoConfig) { public void setDaoConfig(DaoConfig theDaoConfig) {
@ -1273,25 +1271,24 @@ public abstract class BaseTransactionProcessor {
// get a map of // get a map of
// existing ids -> PID (for resources that exist in the DB) // existing ids -> PID (for resources that exist in the DB)
// should this be allPartitions? // should this be allPartitions?
Map<IIdType, ResourcePersistentId> idToPID = myIdHelperService.getLatestVersionIdsForResourceIds(RequestPartitionId.allPartitions(), ResourcePersistentIdMap resourceVersionMap = myResourceVersionSvc.getLatestVersionIdsForResourceIds(RequestPartitionId.allPartitions(),
theReferencesToAutoVersion.stream() theReferencesToAutoVersion.stream()
.map(ref -> ref.getReferenceElement()).collect(Collectors.toList())); .map(IBaseReference::getReferenceElement).collect(Collectors.toList()));
for (IBaseReference baseRef : theReferencesToAutoVersion) { for (IBaseReference baseRef : theReferencesToAutoVersion) {
IIdType id = baseRef.getReferenceElement(); IIdType id = baseRef.getReferenceElement();
if (!idToPID.containsKey(id) if (!resourceVersionMap.containsKey(id)
&& myDaoConfig.isAutoCreatePlaceholderReferenceTargets()) { && myDaoConfig.isAutoCreatePlaceholderReferenceTargets()) {
// not in the db, but autocreateplaceholders is true // not in the db, but autocreateplaceholders is true
// so the version we'll set is "1" (since it will be // so the version we'll set is "1" (since it will be
// created later) // created later)
String newRef = id.withVersion("1").getValue(); String newRef = id.withVersion("1").getValue();
id.setValue(newRef); id.setValue(newRef);
} } else {
else {
// we will add the looked up info to the transaction // we will add the looked up info to the transaction
// for later // for later
theTransactionDetails.addResolvedResourceId(id, theTransactionDetails.addResolvedResourceId(id,
idToPID.get(id)); resourceVersionMap.getResourcePersistentId(id));
} }
} }
@ -1688,12 +1685,12 @@ public abstract class BaseTransactionProcessor {
public class RetriableBundleTask implements Runnable { public class RetriableBundleTask implements Runnable {
private CountDownLatch myCompletedLatch; private final CountDownLatch myCompletedLatch;
private RequestDetails myRequestDetails; private final RequestDetails myRequestDetails;
private IBase myNextReqEntry; private final IBase myNextReqEntry;
private Map<Integer, Object> myResponseMap; private final Map<Integer, Object> myResponseMap;
private int myResponseOrder; private final int myResponseOrder;
private boolean myNestedMode; private final boolean myNestedMode;
private Throwable myLastSeenException; private Throwable myLastSeenException;
protected RetriableBundleTask(CountDownLatch theCompletedLatch, RequestDetails theRequestDetails, Map<Integer, Object> theResponseMap, int theResponseOrder, IBase theNextReqEntry, boolean theNestedMode) { protected RetriableBundleTask(CountDownLatch theCompletedLatch, RequestDetails theRequestDetails, Map<Integer, Object> theResponseMap, int theResponseOrder, IBase theNextReqEntry, boolean theNestedMode) {
@ -1708,7 +1705,7 @@ public abstract class BaseTransactionProcessor {
private void processBatchEntry() { private void processBatchEntry() {
IBaseBundle subRequestBundle = myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.TRANSACTION.toCode()); IBaseBundle subRequestBundle = myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.TRANSACTION.toCode());
myVersionAdapter.addEntry(subRequestBundle, (IBase) myNextReqEntry); myVersionAdapter.addEntry(subRequestBundle, myNextReqEntry);
IBaseBundle nextResponseBundle = processTransactionAsSubRequest(myRequestDetails, subRequestBundle, "Batch sub-request", myNestedMode); IBaseBundle nextResponseBundle = processTransactionAsSubRequest(myRequestDetails, subRequestBundle, "Batch sub-request", myNestedMode);
@ -1752,7 +1749,7 @@ public abstract class BaseTransactionProcessor {
} }
// checking for the parallelism // checking for the parallelism
ourLog.debug("Processing batch entry for {} is completed", myVersionAdapter.getEntryRequestUrl((IBase)myNextReqEntry)); ourLog.debug("processing batch for {} is completed", myVersionAdapter.getEntryRequestUrl(myNextReqEntry));
myCompletedLatch.countDown(); myCompletedLatch.countDown();
} }

View File

@ -303,7 +303,7 @@ public class IdHelperService {
if (forcedId.isPresent()) { if (forcedId.isPresent()) {
retVal.setValue(theResourceType + '/' + forcedId.get()); retVal.setValue(theResourceType + '/' + forcedId.get());
} else { } else {
retVal.setValue(theResourceType + '/' + theId.toString()); retVal.setValue(theResourceType + '/' + theId);
} }
return retVal; return retVal;
@ -530,95 +530,6 @@ public class IdHelperService {
return retVal; return retVal;
} }
/**
* Helper method to determine if some resources exist in the DB (without throwing).
* Returns a set that contains the IIdType for every resource found.
* If it's not found, it won't be included in the set.
* @param theIds - list of IIdType ids (for the same resource)
* @return
*/
private Map<IIdType, ResourcePersistentId> getIdsOfExistingResources(RequestPartitionId thePartitionId,
Collection<IIdType> theIds) {
// these are the found Ids that were in the db
HashMap<IIdType, ResourcePersistentId> collected = new HashMap<>();
if (theIds == null || theIds.isEmpty()) {
return collected;
}
List<ResourcePersistentId> resourcePersistentIds = resolveResourcePersistentIdsWithCache(thePartitionId,
theIds.stream().collect(Collectors.toList()));
// we'll use this map to fetch pids that require versions
HashMap<Long, ResourcePersistentId> pidsToVersionToResourcePid = new HashMap<>();
// fill in our map
for (ResourcePersistentId pid : resourcePersistentIds) {
if (pid.getVersion() == null) {
pidsToVersionToResourcePid.put(pid.getIdAsLong(), pid);
}
Optional<IIdType> idOp = theIds.stream()
.filter(i -> i.getIdPart().equals(pid.getAssociatedResourceId().getIdPart()))
.findFirst();
// this should always be present
// since it was passed in.
// but land of optionals...
idOp.ifPresent(id -> {
collected.put(id, pid);
});
}
// set any versions we don't already have
if (!pidsToVersionToResourcePid.isEmpty()) {
Collection<Object[]> resourceEntries = myResourceTableDao
.getResourceVersionsForPid(new ArrayList<>(pidsToVersionToResourcePid.keySet()));
for (Object[] record : resourceEntries) {
// order matters!
Long retPid = (Long)record[0];
String resType = (String)record[1];
Long version = (Long)record[2];
pidsToVersionToResourcePid.get(retPid).setVersion(version);
}
}
return collected;
}
/**
* Retrieves the latest versions for any resourceid that are found.
* If they are not found, they will not be contained in the returned map.
* The key should be the same value that was passed in to allow
* consumer to look up the value using the id they already have.
*
* This method should not throw, so it can safely be consumed in
* transactions.
*
* @param theRequestPartitionId - request partition id
* @param theIds - list of IIdTypes for resources of interest.
* @return
*/
public Map<IIdType, ResourcePersistentId> getLatestVersionIdsForResourceIds(RequestPartitionId theRequestPartitionId, Collection<IIdType> theIds) {
HashMap<IIdType, ResourcePersistentId> idToPID = new HashMap<>();
HashMap<String, List<IIdType>> resourceTypeToIds = new HashMap<>();
for (IIdType id : theIds) {
String resourceType = id.getResourceType();
if (!resourceTypeToIds.containsKey(resourceType)) {
resourceTypeToIds.put(resourceType, new ArrayList<>());
}
resourceTypeToIds.get(resourceType).add(id);
}
for (String resourceType : resourceTypeToIds.keySet()) {
Map<IIdType, ResourcePersistentId> idAndPID = getIdsOfExistingResources(theRequestPartitionId,
resourceTypeToIds.get(resourceType));
idToPID.putAll(idAndPID);
}
return idToPID;
}
/** /**
* @deprecated This method doesn't take a partition ID as input, so it is unsafe. It * @deprecated This method doesn't take a partition ID as input, so it is unsafe. It
* should be reworked to include the partition ID before any new use is incorporated * should be reworked to include the partition ID before any new use is incorporated

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.search.elastic; package ca.uhn.fhir.jpa.search.elastic;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.ConfigurationException; import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.config.DaoConfig;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -24,7 +44,7 @@ public class IndexNamePrefixLayoutStrategy implements IndexLayoutStrategy {
@Autowired @Autowired
private DaoConfig myDaoConfig; private DaoConfig myDaoConfig;
static final Log log = (Log) LoggerFactory.make(Log.class, MethodHandles.lookup()); static final Log log = LoggerFactory.make(Log.class, MethodHandles.lookup());
public static final String NAME = "prefix"; public static final String NAME = "prefix";
public static final Pattern UNIQUE_KEY_EXTRACTION_PATTERN = Pattern.compile("(.*)-\\d{6}"); public static final Pattern UNIQUE_KEY_EXTRACTION_PATTERN = Pattern.compile("(.*)-\\d{6}");

View File

@ -21,12 +21,12 @@ public class ResourceVersionCacheSvcTest extends BaseJpaR4Test {
IIdType patientId = myPatientDao.create(patient).getId(); IIdType patientId = myPatientDao.create(patient).getId();
ResourceVersionMap versionMap = myResourceVersionCacheSvc.getVersionMap("Patient", SearchParameterMap.newSynchronous()); ResourceVersionMap versionMap = myResourceVersionCacheSvc.getVersionMap("Patient", SearchParameterMap.newSynchronous());
assertEquals(1, versionMap.size()); assertEquals(1, versionMap.size());
assertEquals("1", versionMap.getVersion(patientId)); assertEquals(1L, versionMap.getVersion(patientId));
patient.setGender(Enumerations.AdministrativeGender.MALE); patient.setGender(Enumerations.AdministrativeGender.MALE);
myPatientDao.update(patient); myPatientDao.update(patient);
versionMap = myResourceVersionCacheSvc.getVersionMap("Patient", SearchParameterMap.newSynchronous()); versionMap = myResourceVersionCacheSvc.getVersionMap("Patient", SearchParameterMap.newSynchronous());
assertEquals(1, versionMap.size()); assertEquals(1, versionMap.size());
assertEquals("2", versionMap.getVersion(patientId)); assertEquals(2L, versionMap.getVersion(patientId));
} }
} }

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.interceptor.executor.InterceptorService; import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.cache.IResourceVersionSvc;
import ca.uhn.fhir.jpa.dao.index.IdHelperService; import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.dao.r4.TransactionProcessorVersionAdapterR4; import ca.uhn.fhir.jpa.dao.r4.TransactionProcessorVersionAdapterR4;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
@ -70,6 +71,8 @@ public class TransactionProcessorTest {
private MatchUrlService myMatchUrlService; private MatchUrlService myMatchUrlService;
@MockBean @MockBean
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc; private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@MockBean
private IResourceVersionSvc myResourceVersionSvc;
@MockBean(answer = Answers.RETURNS_DEEP_STUBS) @MockBean(answer = Answers.RETURNS_DEEP_STUBS)
private SessionImpl mySession; private SessionImpl mySession;

View File

@ -1,11 +1,12 @@
package ca.uhn.fhir.jpa.dao.index; package ca.uhn.fhir.jpa.dao.index;
import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.cache.ResourcePersistentIdMap;
import ca.uhn.fhir.jpa.cache.ResourceVersionSvcDaoImpl;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.model.config.PartitionSettings; import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.entity.ForcedId; import ca.uhn.fhir.jpa.model.entity.ForcedId;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
@ -17,7 +18,6 @@ import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import javax.persistence.EntityManager;
import javax.persistence.TypedQuery; import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaBuilder; import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery; import javax.persistence.criteria.CriteriaQuery;
@ -28,13 +28,14 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class IdHelperServiceTest { public class ResourceVersionSvcTest {
// helper class to package up data for helper methods // helper class to package up data for helper methods
private class ResourceIdPackage { private class ResourceIdPackage {
@ -52,19 +53,15 @@ public class IdHelperServiceTest {
} }
@Mock @Mock
private IResourceTableDao myResourceTableDao; DaoRegistry myDaoRegistry;
@Mock @Mock
private DaoConfig myDaoConfig; IResourceTableDao myResourceTableDao;
@Mock @Mock
private MemoryCacheService myMemoryCacheService; IdHelperService myIdHelperService;
@Mock
private EntityManager myEntityManager;
// TODO KHS move the methods that use this out to a separate test class
@InjectMocks @InjectMocks
private IdHelperService myIdHelperService; private ResourceVersionSvcDaoImpl myResourceVersionSvc;
/** /**
* Gets a ResourceTable record for getResourceVersionsForPid * Gets a ResourceTable record for getResourceVersionsForPid
@ -92,20 +89,7 @@ public class IdHelperServiceTest {
Root<ForcedId> from = Mockito.mock(Root.class); Root<ForcedId> from = Mockito.mock(Root.class);
Path path = Mockito.mock(Path.class); Path path = Mockito.mock(Path.class);
Mockito.when(cb.createQuery(Mockito.any(Class.class)))
.thenReturn(criteriaQuery);
Mockito.when(criteriaQuery.from(Mockito.any(Class.class)))
.thenReturn(from);
Mockito.when(from.get(Mockito.anyString()))
.thenReturn(path);
TypedQuery<ForcedId> queryMock = Mockito.mock(TypedQuery.class); TypedQuery<ForcedId> queryMock = Mockito.mock(TypedQuery.class);
Mockito.when(queryMock.getResultList()).thenReturn(new ArrayList<>()); // not found
Mockito.when(myEntityManager.getCriteriaBuilder())
.thenReturn(cb);
Mockito.when(myEntityManager.createQuery(Mockito.any(CriteriaQuery.class)))
.thenReturn(queryMock);
} }
/** /**
@ -130,15 +114,11 @@ public class IdHelperServiceTest {
ResourcePersistentId first = resourcePersistentIds.remove(0); ResourcePersistentId first = resourcePersistentIds.remove(0);
if (resourcePersistentIds.isEmpty()) { if (resourcePersistentIds.isEmpty()) {
Mockito.when(myMemoryCacheService.getIfPresent(Mockito.any(MemoryCacheService.CacheEnum.class), Mockito.anyString())) when(myIdHelperService.resolveResourcePersistentIdsWithCache(any(), any())).thenReturn(Collections.singletonList(first));
.thenReturn(first).thenReturn(null);
} }
else { else {
Mockito.when(myMemoryCacheService.getIfPresent(Mockito.any(MemoryCacheService.CacheEnum.class), Mockito.anyString())) when(myIdHelperService.resolveResourcePersistentIdsWithCache(any(), any())).thenReturn(resourcePersistentIds);
.thenReturn(first, resourcePersistentIds.toArray());
} }
Mockito.when(myResourceTableDao.getResourceVersionsForPid(Mockito.anyList()))
.thenReturn(matches);
} }
@Test @Test
@ -154,7 +134,7 @@ public class IdHelperServiceTest {
mockReturnsFor_getIdsOfExistingResources(pack); mockReturnsFor_getIdsOfExistingResources(pack);
// test // test
Map<IIdType, ResourcePersistentId> retMap = myIdHelperService.getLatestVersionIdsForResourceIds(RequestPartitionId.allPartitions(), ResourcePersistentIdMap retMap = myResourceVersionSvc.getLatestVersionIdsForResourceIds(RequestPartitionId.allPartitions(),
Collections.singletonList(type)); Collections.singletonList(type));
Assertions.assertTrue(retMap.containsKey(type)); Assertions.assertTrue(retMap.containsKey(type));
@ -169,7 +149,7 @@ public class IdHelperServiceTest {
mock_resolveResourcePersistentIdsWithCache_toReturnNothing(); mock_resolveResourcePersistentIdsWithCache_toReturnNothing();
// test // test
Map<IIdType, ResourcePersistentId> retMap = myIdHelperService.getLatestVersionIdsForResourceIds(RequestPartitionId.allPartitions(), ResourcePersistentIdMap retMap = myResourceVersionSvc.getLatestVersionIdsForResourceIds(RequestPartitionId.allPartitions(),
Collections.singletonList(type)); Collections.singletonList(type));
Assertions.assertTrue(retMap.isEmpty()); Assertions.assertTrue(retMap.isEmpty());
@ -191,7 +171,7 @@ public class IdHelperServiceTest {
mockReturnsFor_getIdsOfExistingResources(pack); mockReturnsFor_getIdsOfExistingResources(pack);
// test // test
Map<IIdType, ResourcePersistentId> retMap = myIdHelperService.getLatestVersionIdsForResourceIds( ResourcePersistentIdMap retMap = myResourceVersionSvc.getLatestVersionIdsForResourceIds(
RequestPartitionId.allPartitions(), RequestPartitionId.allPartitions(),
Arrays.asList(type, type2) Arrays.asList(type, type2)
); );

View File

@ -14,6 +14,7 @@ import ca.uhn.fhir.jpa.cache.ResourceChangeListenerCache;
import ca.uhn.fhir.jpa.cache.ResourceChangeListenerCacheFactory; import ca.uhn.fhir.jpa.cache.ResourceChangeListenerCacheFactory;
import ca.uhn.fhir.jpa.cache.ResourceChangeListenerCacheRefresherImpl; import ca.uhn.fhir.jpa.cache.ResourceChangeListenerCacheRefresherImpl;
import ca.uhn.fhir.jpa.cache.ResourceChangeListenerRegistryImpl; import ca.uhn.fhir.jpa.cache.ResourceChangeListenerRegistryImpl;
import ca.uhn.fhir.jpa.cache.ResourcePersistentIdMap;
import ca.uhn.fhir.jpa.cache.ResourceVersionMap; import ca.uhn.fhir.jpa.cache.ResourceVersionMap;
import ca.uhn.fhir.jpa.dao.JpaResourceDao; import ca.uhn.fhir.jpa.dao.JpaResourceDao;
import ca.uhn.fhir.jpa.dao.TransactionProcessor; import ca.uhn.fhir.jpa.dao.TransactionProcessor;
@ -45,6 +46,7 @@ import com.google.common.collect.Lists;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.hibernate.internal.SessionImpl; import org.hibernate.internal.SessionImpl;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Bundle; import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.ExplanationOfBenefit; import org.hl7.fhir.r4.model.ExplanationOfBenefit;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -323,10 +325,15 @@ public class GiantTransactionPerfTest {
@Nonnull @Nonnull
@Override @Override
public ResourceVersionMap getVersionMap(String theResourceName, SearchParameterMap theSearchParamMap) { public ResourceVersionMap getVersionMap(RequestPartitionId theRequestPartitionId, String theResourceName, SearchParameterMap theSearchParamMap) {
myGetVersionMap++; myGetVersionMap++;
return ResourceVersionMap.fromResources(Lists.newArrayList()); return ResourceVersionMap.fromResources(Lists.newArrayList());
} }
@Override
public ResourcePersistentIdMap getLatestVersionIdsForResourceIds(RequestPartitionId thePartition, List<IIdType> theIds) {
return null;
}
} }
private class MockResourceHistoryTableDao implements IResourceHistoryTableDao { private class MockResourceHistoryTableDao implements IResourceHistoryTableDao {

View File

@ -20,9 +20,12 @@ package ca.uhn.fhir.jpa.cache;
* #L% * #L%
*/ */
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import org.hl7.fhir.instance.model.api.IIdType;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.List;
/** /**
* This interface is used by the {@link IResourceChangeListenerCacheRefresher} to read resources matching the provided * This interface is used by the {@link IResourceChangeListenerCacheRefresher} to read resources matching the provided
@ -30,5 +33,12 @@ import javax.annotation.Nonnull;
*/ */
public interface IResourceVersionSvc { public interface IResourceVersionSvc {
@Nonnull @Nonnull
ResourceVersionMap getVersionMap(String theResourceName, SearchParameterMap theSearchParamMap); ResourceVersionMap getVersionMap(RequestPartitionId theRequestPartitionId, String theResourceName, SearchParameterMap theSearchParamMap);
@Nonnull
default ResourceVersionMap getVersionMap(String theResourceName, SearchParameterMap theSearchParamMap) {
return getVersionMap(RequestPartitionId.allPartitions(), theResourceName, theSearchParamMap);
}
ResourcePersistentIdMap getLatestVersionIdsForResourceIds(RequestPartitionId thePartition, List<IIdType> theIds);
} }

View File

@ -173,8 +173,8 @@ public class ResourceChangeListenerCacheRefresherImpl implements IResourceChange
List<IIdType> updatedIds = new ArrayList<>(); List<IIdType> updatedIds = new ArrayList<>();
for (IIdType id : theNewResourceVersionMap.keySet()) { for (IIdType id : theNewResourceVersionMap.keySet()) {
String previousValue = theOldResourceVersionCache.put(id, theNewResourceVersionMap.get(id)); Long previousValue = theOldResourceVersionCache.put(id, theNewResourceVersionMap.get(id));
IIdType newId = id.withVersion(theNewResourceVersionMap.get(id)); IIdType newId = id.withVersion(theNewResourceVersionMap.get(id).toString());
if (previousValue == null) { if (previousValue == null) {
createdIds.add(newId); createdIds.add(newId);
} else if (!theNewResourceVersionMap.get(id).equals(previousValue)) { } else if (!theNewResourceVersionMap.get(id).equals(previousValue)) {

View File

@ -0,0 +1,67 @@
package ca.uhn.fhir.jpa.cache;
/*-
* #%L
* HAPI FHIR Search Parameters
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IIdType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ResourcePersistentIdMap {
private final Map<IIdType, ResourcePersistentId> myMap = new HashMap<>();
public static ResourcePersistentIdMap fromResourcePersistentIds(List<ResourcePersistentId> theResourcePersistentIds) {
ResourcePersistentIdMap retval = new ResourcePersistentIdMap();
theResourcePersistentIds.forEach(retval::add);
return retval;
}
private void add(ResourcePersistentId theResourcePersistentId) {
IIdType id = theResourcePersistentId.getAssociatedResourceId();
myMap.put(id.toUnqualifiedVersionless(), theResourcePersistentId);
}
public boolean containsKey(IIdType theId) {
return myMap.containsKey(theId.toUnqualifiedVersionless());
}
public ResourcePersistentId getResourcePersistentId(IIdType theId) {
return myMap.get(theId.toUnqualifiedVersionless());
}
public boolean isEmpty() {
return myMap.isEmpty();
}
public int size() {
return myMap.size();
}
public void put(IIdType theId, ResourcePersistentId thePid) {
myMap.put(theId, thePid);
}
public void putAll(ResourcePersistentIdMap theIdAndPID) {
myMap.putAll(theIdAndPID.myMap);
}
}

View File

@ -32,7 +32,7 @@ import java.util.Set;
* detect resources that were modified on remote servers in our cluster. * detect resources that were modified on remote servers in our cluster.
*/ */
public class ResourceVersionCache { public class ResourceVersionCache {
private final Map<IIdType, String> myVersionMap = new HashMap<>(); private final Map<IIdType, Long> myVersionMap = new HashMap<>();
public void clear() { public void clear() {
myVersionMap.clear(); myVersionMap.clear();
@ -43,15 +43,15 @@ public class ResourceVersionCache {
* @param theVersion * @param theVersion
* @return previous value * @return previous value
*/ */
public String put(IIdType theResourceId, String theVersion) { public Long put(IIdType theResourceId, Long theVersion) {
return myVersionMap.put(new IdDt(theResourceId).toVersionless(), theVersion); return myVersionMap.put(new IdDt(theResourceId).toVersionless(), theVersion);
} }
public String getVersionForResourceId(IIdType theResourceId) { public Long getVersionForResourceId(IIdType theResourceId) {
return myVersionMap.get(new IdDt(theResourceId)); return myVersionMap.get(new IdDt(theResourceId));
} }
public String removeResourceId(IIdType theResourceId) { public Long removeResourceId(IIdType theResourceId) {
return myVersionMap.remove(new IdDt(theResourceId)); return myVersionMap.remove(new IdDt(theResourceId));
} }

View File

@ -24,6 +24,8 @@ import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.model.primitive.IdDt;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -36,8 +38,10 @@ import java.util.Set;
* This immutable map holds a copy of current resource versions read from the repository. * This immutable map holds a copy of current resource versions read from the repository.
*/ */
public class ResourceVersionMap { public class ResourceVersionMap {
private static final Logger ourLog = LoggerFactory.getLogger(ResourceVersionMap.class);
private final Set<IIdType> mySourceIds = new HashSet<>(); private final Set<IIdType> mySourceIds = new HashSet<>();
private final Map<IIdType, String> myMap = new HashMap<>(); // Key versionless id, value version
private final Map<IIdType, Long> myMap = new HashMap<>();
private ResourceVersionMap() {} private ResourceVersionMap() {}
public static ResourceVersionMap fromResourceTableEntities(List<ResourceTable> theEntities) { public static ResourceVersionMap fromResourceTableEntities(List<ResourceTable> theEntities) {
@ -57,13 +61,17 @@ public class ResourceVersionMap {
} }
private void add(IIdType theId) { private void add(IIdType theId) {
if (theId.getVersionIdPart() == null) {
ourLog.warn("Not storing {} in ResourceVersionMap because it does not have a version.", theId);
return;
}
IdDt id = new IdDt(theId); IdDt id = new IdDt(theId);
mySourceIds.add(id); mySourceIds.add(id);
myMap.put(id.toUnqualifiedVersionless(), id.getVersionIdPart()); myMap.put(id.toUnqualifiedVersionless(), id.getVersionIdPartAsLong());
} }
public String getVersion(IIdType theResourceId) { public Long getVersion(IIdType theResourceId) {
return myMap.get(new IdDt(theResourceId.toUnqualifiedVersionless())); return get(theResourceId);
} }
public int size() { public int size() {
@ -78,11 +86,15 @@ public class ResourceVersionMap {
return Collections.unmodifiableSet(mySourceIds); return Collections.unmodifiableSet(mySourceIds);
} }
public String get(IIdType theId) { public Long get(IIdType theId) {
return myMap.get(new IdDt(theId.toUnqualifiedVersionless())); return myMap.get(new IdDt(theId.toUnqualifiedVersionless()));
} }
public boolean containsKey(IIdType theId) { public boolean containsKey(IIdType theId) {
return myMap.containsKey(new IdDt(theId.toUnqualifiedVersionless())); return myMap.containsKey(new IdDt(theId.toUnqualifiedVersionless()));
} }
public boolean isEmpty() {
return myMap.isEmpty();
}
} }

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.mdm.api; package ca.uhn.fhir.mdm.api;
/*-
* #%L
* HAPI FHIR - Master Data Management
* %%
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
public interface IMdmBatchJobSubmitterFactory { public interface IMdmBatchJobSubmitterFactory {
IMdmClearJobSubmitter getClearJobSubmitter(); IMdmClearJobSubmitter getClearJobSubmitter();
} }

View File

@ -33,6 +33,7 @@ import java.util.Optional;
* a Long, a String, or something else. * a Long, a String, or something else.
*/ */
public class ResourcePersistentId { public class ResourcePersistentId {
private static final String RESOURCE_PID = "RESOURCE_PID";
private Object myId; private Object myId;
private Long myVersion; private Long myVersion;
private IIdType myAssociatedResourceId; private IIdType myAssociatedResourceId;