Bypass jpa for lastn results (#3226)
* Add flag in DaoConfig to enable storing resources in lucene index * Update loadResourceByPid to use ElasticSearch for loading observation resource and test added to check it * Added parsing to elastic search to parse inlined observation resources json to resource object
This commit is contained in:
parent
a41d79ed22
commit
1c318aac1e
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
type: add
|
||||
issue: 2838
|
||||
title: "The resource contents are optionally added to the lucene index so $lastn queries can be satisfied without
|
||||
database access. This is activated by the DaoConfig 'StoreResourceInLuceneIndex' property."
|
|
@ -21,7 +21,9 @@ package ca.uhn.fhir.jpa.dao;
|
|||
*/
|
||||
|
||||
import ca.uhn.fhir.context.*;
|
||||
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceEncodingEnum;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamToken;
|
||||
import ca.uhn.fhir.jpa.model.util.CodeSystemHash;
|
||||
import ca.uhn.fhir.jpa.search.lastn.IElasticsearchSvc;
|
||||
|
@ -29,6 +31,7 @@ import ca.uhn.fhir.jpa.search.lastn.json.CodeJson;
|
|||
import ca.uhn.fhir.jpa.search.lastn.json.ObservationJson;
|
||||
import ca.uhn.fhir.jpa.searchparam.extractor.ISearchParamExtractor;
|
||||
import ca.uhn.fhir.jpa.searchparam.extractor.PathAndRef;
|
||||
import ca.uhn.fhir.parser.IParser;
|
||||
import org.hl7.fhir.instance.model.api.*;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
|
@ -42,6 +45,12 @@ public class ObservationLastNIndexPersistSvc {
|
|||
@Autowired(required = false)
|
||||
private IElasticsearchSvc myElasticsearchSvc;
|
||||
|
||||
@Autowired
|
||||
private DaoConfig myConfig;
|
||||
|
||||
@Autowired
|
||||
private FhirContext myContext;
|
||||
|
||||
public void indexObservation(IBaseResource theResource) {
|
||||
|
||||
if (myElasticsearchSvc == null) {
|
||||
|
@ -74,15 +83,14 @@ public class ObservationLastNIndexPersistSvc {
|
|||
|
||||
List<IBase> observationCategoryCodeableConcepts = mySearchParameterExtractor.extractValues("Observation.category", theResource);
|
||||
|
||||
String resourcePID = theResource.getIdElement().getIdPart();
|
||||
|
||||
createOrUpdateIndexedObservation(resourcePID, effectiveDtm, subjectId, observationCodeCodeableConcepts, observationCategoryCodeableConcepts);
|
||||
createOrUpdateIndexedObservation(theResource, effectiveDtm, subjectId, observationCodeCodeableConcepts, observationCategoryCodeableConcepts);
|
||||
|
||||
}
|
||||
|
||||
private void createOrUpdateIndexedObservation(String resourcePID, Date theEffectiveDtm, String theSubjectId,
|
||||
private void createOrUpdateIndexedObservation(IBaseResource theResource, Date theEffectiveDtm, String theSubjectId,
|
||||
List<IBase> theObservationCodeCodeableConcepts,
|
||||
List<IBase> theObservationCategoryCodeableConcepts) {
|
||||
String resourcePID = theResource.getIdElement().getIdPart();
|
||||
|
||||
// Determine if an index already exists for Observation:
|
||||
ObservationJson indexedObservation = null;
|
||||
|
@ -95,6 +103,9 @@ public class ObservationLastNIndexPersistSvc {
|
|||
|
||||
indexedObservation.setEffectiveDtm(theEffectiveDtm);
|
||||
indexedObservation.setIdentifier(resourcePID);
|
||||
if (myConfig.isStoreResourceInLuceneIndex()) {
|
||||
indexedObservation.setResource(encodeResource(theResource));
|
||||
}
|
||||
indexedObservation.setSubject(theSubjectId);
|
||||
|
||||
addCodeToObservationIndex(theObservationCodeCodeableConcepts, indexedObservation);
|
||||
|
@ -105,6 +116,11 @@ public class ObservationLastNIndexPersistSvc {
|
|||
|
||||
}
|
||||
|
||||
private String encodeResource(IBaseResource theResource) {
|
||||
IParser parser = myContext.newJsonParser();
|
||||
return parser.encodeResourceToString(theResource);
|
||||
}
|
||||
|
||||
private void addCodeToObservationIndex(List<IBase> theObservationCodeCodeableConcepts,
|
||||
ObservationJson theIndexedObservation) {
|
||||
// Determine if a Normalized ID was created previously for Observation Code
|
||||
|
@ -172,7 +188,7 @@ public class ObservationLastNIndexPersistSvc {
|
|||
String code = param.getValue();
|
||||
String text = mySearchParameterExtractor.getDisplayTextForCoding(nextCoding);
|
||||
|
||||
String codeSystemHash = String.valueOf(CodeSystemHash.hashCodeSystem(system, code));
|
||||
String codeSystemHash = String.valueOf(CodeSystemHash.hashCodeSystem(system, code));
|
||||
CodeJson codeCodeableConceptDocument = myElasticsearchSvc.getObservationCodeDocument(codeSystemHash, text);
|
||||
if (codeCodeableConceptDocument != null) {
|
||||
codeCodeableConceptIdOptional = Optional.of(codeCodeableConceptDocument.getCodeableConceptId());
|
||||
|
|
|
@ -20,27 +20,26 @@ package ca.uhn.fhir.jpa.dao;
|
|||
* #L%
|
||||
*/
|
||||
|
||||
import ca.uhn.fhir.context.BaseRuntimeChildDefinition;
|
||||
import ca.uhn.fhir.context.BaseRuntimeElementDefinition;
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.parser.DataFormatException;
|
||||
import ca.uhn.fhir.parser.IParserErrorHandler;
|
||||
import ca.uhn.fhir.parser.JsonParser;
|
||||
import ca.uhn.fhir.util.IModelVisitor2;
|
||||
import ca.uhn.fhir.parser.LenientErrorHandler;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.JsonObject;
|
||||
import org.hl7.fhir.instance.model.api.IBase;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IPrimitiveType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.math.BigDecimal;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.defaultString;
|
||||
|
||||
class TolerantJsonParser extends JsonParser {
|
||||
public class TolerantJsonParser extends JsonParser {
|
||||
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(TolerantJsonParser.class);
|
||||
private final FhirContext myContext;
|
||||
|
@ -52,7 +51,7 @@ class TolerantJsonParser extends JsonParser {
|
|||
* @param theResourcePid The ID of the resource that will be parsed with this parser. It would be ok to change the
|
||||
* datatype for this param if we ever need to since it's only used for logging.
|
||||
*/
|
||||
TolerantJsonParser(FhirContext theContext, IParserErrorHandler theParserErrorHandler, Long theResourcePid) {
|
||||
public TolerantJsonParser(FhirContext theContext, IParserErrorHandler theParserErrorHandler, Long theResourcePid) {
|
||||
super(theContext, theParserErrorHandler);
|
||||
myContext = theContext;
|
||||
myResourcePid = theResourcePid;
|
||||
|
@ -80,7 +79,7 @@ class TolerantJsonParser extends JsonParser {
|
|||
* ParserState.Primitive state too.
|
||||
*/
|
||||
|
||||
String msg = defaultString(e.getMessage());
|
||||
String msg = defaultString(e.getMessage(), "");
|
||||
if (msg.contains("Unexpected character ('.' (code 46))") || msg.contains("Invalid numeric value: Leading zeroes not allowed")) {
|
||||
Gson gson = new Gson();
|
||||
|
||||
|
@ -89,20 +88,19 @@ class TolerantJsonParser extends JsonParser {
|
|||
|
||||
T parsed = super.parseResource(theResourceType, corrected);
|
||||
|
||||
myContext.newTerser().visit(parsed, new IModelVisitor2() {
|
||||
@Override
|
||||
public boolean acceptElement(IBase theElement, List<IBase> theContainingElementPath, List<BaseRuntimeChildDefinition> theChildDefinitionPath, List<BaseRuntimeElementDefinition<?>> theElementDefinitionPath) {
|
||||
myContext.newTerser().visit(parsed, (theElement, theContainingElementPath, theChildDefinitionPath, theElementDefinitionPath) -> {
|
||||
|
||||
BaseRuntimeElementDefinition<?> def = theElementDefinitionPath.get(theElementDefinitionPath.size() - 1);
|
||||
if (def.getName().equals("decimal")) {
|
||||
IPrimitiveType<BigDecimal> decimal = (IPrimitiveType<BigDecimal>) theElement;
|
||||
String newPlainString = decimal.getValue().toPlainString();
|
||||
ourLog.warn("Correcting invalid previously saved decimal number for Resource[pid={}] - Was {} and now is {}", myResourcePid, decimal.getValueAsString(), newPlainString);
|
||||
decimal.setValueAsString(newPlainString);
|
||||
}
|
||||
|
||||
return true;
|
||||
BaseRuntimeElementDefinition<?> def = theElementDefinitionPath.get(theElementDefinitionPath.size() - 1);
|
||||
if (def.getName().equals("decimal")) {
|
||||
IPrimitiveType<BigDecimal> decimal = (IPrimitiveType<BigDecimal>) theElement;
|
||||
String oldValue = decimal.getValueAsString();
|
||||
String newValue = decimal.getValue().toPlainString();
|
||||
ourLog.warn("Correcting invalid previously saved decimal number for Resource[pid={}] - Was {} and now is {}",
|
||||
Objects.isNull(myResourcePid) ? "" : myResourcePid, oldValue, newValue);
|
||||
decimal.setValueAsString(newValue);
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
return parsed;
|
||||
|
@ -111,4 +109,9 @@ class TolerantJsonParser extends JsonParser {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public static TolerantJsonParser createWithLenientErrorHandling(FhirContext theContext, @Nullable Long theResourcePid) {
|
||||
LenientErrorHandler errorHandler = new LenientErrorHandler(false).setErrorOnInvalidValue(false);
|
||||
return new TolerantJsonParser(theContext, errorHandler, theResourcePid);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -544,7 +544,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
|||
|
||||
List<IBaseResource> resources = new ArrayList<>();
|
||||
theSb.loadResourcesByPid(pids, includedPidsList, resources, false, theRequestDetails);
|
||||
|
||||
// Hook: STORAGE_PRESHOW_RESOURCES
|
||||
resources = InterceptorUtil.fireStoragePreshowResource(resources, theRequestDetails, myInterceptorBroadcaster);
|
||||
|
||||
|
|
|
@ -121,6 +121,7 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -372,11 +373,10 @@ public class SearchBuilder implements ISearchBuilder {
|
|||
private List<ResourcePersistentId> executeLastNAgainstIndex(Integer theMaximumResults) {
|
||||
validateLastNIsEnabled();
|
||||
|
||||
// TODO MB we can satisfy resources directly if we put the resources in elastic.
|
||||
List<String> lastnResourceIds = myIElasticsearchSvc.executeLastN(myParams, myContext, theMaximumResults);
|
||||
|
||||
return lastnResourceIds.stream()
|
||||
.map(lastnResourceId -> myIdHelperService.resolveResourcePersistentIds(myRequestPartitionId,myResourceName,lastnResourceId))
|
||||
.map(lastnResourceId -> myIdHelperService.resolveResourcePersistentIds(myRequestPartitionId, myResourceName, lastnResourceId))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
@ -403,6 +403,7 @@ public class SearchBuilder implements ISearchBuilder {
|
|||
|
||||
/**
|
||||
* Combs through the params for any _id parameters and extracts the PIDs for them
|
||||
*
|
||||
* @param theTargetPids
|
||||
*/
|
||||
private void extractTargetPidsFromIdParams(HashSet<Long> theTargetPids) {
|
||||
|
@ -416,7 +417,7 @@ public class SearchBuilder implements ISearchBuilder {
|
|||
// we expect all _id values to be StringParams
|
||||
ids.add(((StringParam) param).getValue());
|
||||
} else if (param instanceof TokenParam) {
|
||||
ids.add(((TokenParam)param).getValue());
|
||||
ids.add(((TokenParam) param).getValue());
|
||||
} else {
|
||||
// we do not expect the _id parameter to be a non-string value
|
||||
throw new IllegalArgumentException("_id parameter must be a StringParam or TokenParam");
|
||||
|
@ -845,8 +846,22 @@ public class SearchBuilder implements ISearchBuilder {
|
|||
}
|
||||
|
||||
List<ResourcePersistentId> pids = new ArrayList<>(thePids);
|
||||
new QueryChunker<ResourcePersistentId>().chunk(pids, t -> doLoadPids(t, theIncludedPids, theResourceListToPopulate, theForHistoryOperation, position));
|
||||
// Can we fast track this loading by checking elastic search?
|
||||
if (isLoadingFromElasticSearchSupported(theIncludedPids.isEmpty())) {
|
||||
theResourceListToPopulate.addAll(loadObservationResourcesFromElasticSearch(thePids));
|
||||
} else {
|
||||
// We only chunk because some jdbc drivers can't handle long param lists.
|
||||
new QueryChunker<ResourcePersistentId>().chunk(pids, t -> doLoadPids(t, theIncludedPids, theResourceListToPopulate, theForHistoryOperation, position));
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isLoadingFromElasticSearchSupported(boolean noIncludePids) {
|
||||
return noIncludePids && !Objects.isNull(myParams) && myParams.isLastN() && myDaoConfig.isStoreResourceInLuceneIndex()
|
||||
&& myContext.getVersion().getVersion().isEqualOrNewerThan(FhirVersionEnum.DSTU3);
|
||||
}
|
||||
|
||||
private List<IBaseResource> loadObservationResourcesFromElasticSearch(Collection<ResourcePersistentId> thePids) {
|
||||
return myIElasticsearchSvc.getObservationResources(thePids);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1368,7 +1383,7 @@ public class SearchBuilder implements ISearchBuilder {
|
|||
if (myNext == null) {
|
||||
|
||||
|
||||
for (Iterator<ResourcePersistentId> myPreResultsIterator = myAlsoIncludePids.iterator(); myPreResultsIterator.hasNext();) {
|
||||
for (Iterator<ResourcePersistentId> myPreResultsIterator = myAlsoIncludePids.iterator(); myPreResultsIterator.hasNext(); ) {
|
||||
ResourcePersistentId next = myPreResultsIterator.next();
|
||||
if (next != null)
|
||||
if (myPidSet.add(next)) {
|
||||
|
|
|
@ -21,19 +21,24 @@ package ca.uhn.fhir.jpa.search.lastn;
|
|||
*/
|
||||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.jpa.dao.TolerantJsonParser;
|
||||
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
|
||||
import ca.uhn.fhir.jpa.model.entity.IBaseResourceEntity;
|
||||
import ca.uhn.fhir.jpa.model.util.CodeSystemHash;
|
||||
import ca.uhn.fhir.jpa.search.lastn.json.CodeJson;
|
||||
import ca.uhn.fhir.jpa.search.lastn.json.ObservationJson;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.searchparam.util.LastNParameterHelper;
|
||||
import ca.uhn.fhir.model.api.IQueryParameterType;
|
||||
import ca.uhn.fhir.parser.IParser;
|
||||
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||
import ca.uhn.fhir.rest.param.DateParam;
|
||||
import ca.uhn.fhir.rest.param.ParamPrefixEnum;
|
||||
import ca.uhn.fhir.rest.param.ReferenceParam;
|
||||
import ca.uhn.fhir.rest.param.TokenParam;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
|
@ -70,6 +75,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilde
|
|||
import org.elasticsearch.search.aggregations.metrics.ParsedTopHits;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -77,8 +83,11 @@ import java.io.BufferedReader;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||
|
||||
|
@ -93,29 +102,31 @@ public class ElasticsearchSvcImpl implements IElasticsearchSvc {
|
|||
public static final String OBSERVATION_CODE_INDEX_SCHEMA_FILE = "ObservationCodeIndexSchema.json";
|
||||
|
||||
// Aggregation Constants
|
||||
private final String GROUP_BY_SUBJECT = "group_by_subject";
|
||||
private final String GROUP_BY_SYSTEM = "group_by_system";
|
||||
private final String GROUP_BY_CODE = "group_by_code";
|
||||
private final String MOST_RECENT_EFFECTIVE = "most_recent_effective";
|
||||
private static final String GROUP_BY_SUBJECT = "group_by_subject";
|
||||
private static final String GROUP_BY_SYSTEM = "group_by_system";
|
||||
private static final String GROUP_BY_CODE = "group_by_code";
|
||||
private static final String MOST_RECENT_EFFECTIVE = "most_recent_effective";
|
||||
|
||||
// Observation index document element names
|
||||
private final String OBSERVATION_IDENTIFIER_FIELD_NAME = "identifier";
|
||||
private final String OBSERVATION_SUBJECT_FIELD_NAME = "subject";
|
||||
private final String OBSERVATION_CODEVALUE_FIELD_NAME = "codeconceptcodingcode";
|
||||
private final String OBSERVATION_CODESYSTEM_FIELD_NAME = "codeconceptcodingsystem";
|
||||
private final String OBSERVATION_CODEHASH_FIELD_NAME = "codeconceptcodingcode_system_hash";
|
||||
private final String OBSERVATION_CODEDISPLAY_FIELD_NAME = "codeconceptcodingdisplay";
|
||||
private final String OBSERVATION_CODE_TEXT_FIELD_NAME = "codeconcepttext";
|
||||
private final String OBSERVATION_EFFECTIVEDTM_FIELD_NAME = "effectivedtm";
|
||||
private final String OBSERVATION_CATEGORYHASH_FIELD_NAME = "categoryconceptcodingcode_system_hash";
|
||||
private final String OBSERVATION_CATEGORYVALUE_FIELD_NAME = "categoryconceptcodingcode";
|
||||
private final String OBSERVATION_CATEGORYSYSTEM_FIELD_NAME = "categoryconceptcodingsystem";
|
||||
private final String OBSERVATION_CATEGORYDISPLAY_FIELD_NAME = "categoryconceptcodingdisplay";
|
||||
private final String OBSERVATION_CATEGORYTEXT_FIELD_NAME = "categoryconcepttext";
|
||||
private static final String OBSERVATION_IDENTIFIER_FIELD_NAME = "identifier";
|
||||
private static final String OBSERVATION_SUBJECT_FIELD_NAME = "subject";
|
||||
private static final String OBSERVATION_CODEVALUE_FIELD_NAME = "codeconceptcodingcode";
|
||||
private static final String OBSERVATION_CODESYSTEM_FIELD_NAME = "codeconceptcodingsystem";
|
||||
private static final String OBSERVATION_CODEHASH_FIELD_NAME = "codeconceptcodingcode_system_hash";
|
||||
private static final String OBSERVATION_CODEDISPLAY_FIELD_NAME = "codeconceptcodingdisplay";
|
||||
private static final String OBSERVATION_CODE_TEXT_FIELD_NAME = "codeconcepttext";
|
||||
private static final String OBSERVATION_EFFECTIVEDTM_FIELD_NAME = "effectivedtm";
|
||||
private static final String OBSERVATION_CATEGORYHASH_FIELD_NAME = "categoryconceptcodingcode_system_hash";
|
||||
private static final String OBSERVATION_CATEGORYVALUE_FIELD_NAME = "categoryconceptcodingcode";
|
||||
private static final String OBSERVATION_CATEGORYSYSTEM_FIELD_NAME = "categoryconceptcodingsystem";
|
||||
private static final String OBSERVATION_CATEGORYDISPLAY_FIELD_NAME = "categoryconceptcodingdisplay";
|
||||
private static final String OBSERVATION_CATEGORYTEXT_FIELD_NAME = "categoryconcepttext";
|
||||
|
||||
// Code index document element names
|
||||
private final String CODE_HASH = "codingcode_system_hash";
|
||||
private final String CODE_TEXT = "text";
|
||||
private static final String CODE_HASH = "codingcode_system_hash";
|
||||
private static final String CODE_TEXT = "text";
|
||||
|
||||
private static final String OBSERVATION_RESOURCE_NAME = "Observation";
|
||||
|
||||
private final RestHighLevelClient myRestHighLevelClient;
|
||||
|
||||
|
@ -124,6 +135,9 @@ public class ElasticsearchSvcImpl implements IElasticsearchSvc {
|
|||
@Autowired
|
||||
private PartitionSettings myPartitionSettings;
|
||||
|
||||
@Autowired
|
||||
private FhirContext myContext;
|
||||
|
||||
//This constructor used to inject a dummy partitionsettings in test.
|
||||
public ElasticsearchSvcImpl(PartitionSettings thePartitionSetings, String theProtocol, String theHostname, @Nullable String theUsername, @Nullable String thePassword) {
|
||||
this(theProtocol, theHostname, theUsername, thePassword);
|
||||
|
@ -731,6 +745,55 @@ public class ElasticsearchSvcImpl implements IElasticsearchSvc {
|
|||
myRestHighLevelClient.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<IBaseResource> getObservationResources(Collection<ResourcePersistentId> thePids) {
|
||||
SearchRequest searchRequest = buildObservationResourceSearchRequest(thePids);
|
||||
try {
|
||||
// wip mb what is the limit to an ES hit count? 10k? We may need to chunk this :-(
|
||||
SearchResponse observationDocumentResponse = executeSearchRequest(searchRequest);
|
||||
SearchHit[] observationDocumentHits = observationDocumentResponse.getHits().getHits();
|
||||
IParser parser = TolerantJsonParser.createWithLenientErrorHandling(myContext, null);
|
||||
Class<? extends IBaseResource> resourceType = myContext.getResourceDefinition(OBSERVATION_RESOURCE_NAME).getImplementingClass();
|
||||
/**
|
||||
* @see ca.uhn.fhir.jpa.dao.BaseHapiFhirDao#toResource(Class, IBaseResourceEntity, Collection, boolean) for
|
||||
* details about parsing raw json to BaseResource
|
||||
*/
|
||||
// WIP what do we do with partition?
|
||||
// WIP what do we do with deleted observation resources
|
||||
// WIP how do you handle provenance?
|
||||
// Parse using tolerant parser
|
||||
return Arrays.stream(observationDocumentHits)
|
||||
.map(this::parseObservationJson)
|
||||
.map(observationJson -> parser.parseResource(resourceType, observationJson.getResource()))
|
||||
.collect(Collectors.toList());
|
||||
} catch (IOException theE) {
|
||||
// WIP do we fallback to JPA search then?
|
||||
throw new InvalidRequestException("Unable to execute observation document query for provided IDs " + thePids, theE);
|
||||
}
|
||||
}
|
||||
|
||||
private ObservationJson parseObservationJson(SearchHit theSearchHit) {
|
||||
try {
|
||||
return objectMapper.readValue(theSearchHit.getSourceAsString(), ObservationJson.class);
|
||||
} catch (JsonProcessingException exp) {
|
||||
throw new InvalidRequestException("Unable to parse the observation resource json", exp);
|
||||
}
|
||||
}
|
||||
|
||||
private SearchRequest buildObservationResourceSearchRequest(Collection<ResourcePersistentId> thePids) {
|
||||
SearchRequest searchRequest = new SearchRequest(OBSERVATION_INDEX);
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
// Query
|
||||
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
|
||||
List<String> pidParams = thePids.stream().map(Object::toString).collect(Collectors.toList());
|
||||
boolQueryBuilder.must(QueryBuilders.termsQuery(OBSERVATION_IDENTIFIER_FIELD_NAME, pidParams));
|
||||
searchSourceBuilder.query(boolQueryBuilder);
|
||||
searchSourceBuilder.size(thePids.size());
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
return searchRequest;
|
||||
}
|
||||
|
||||
|
||||
private IndexRequest createIndexRequest(String theIndexName, String theDocumentId, String theObservationDocument, String theDocumentType) {
|
||||
IndexRequest request = new IndexRequest(theIndexName);
|
||||
request.id(theDocumentId);
|
||||
|
|
|
@ -24,23 +24,28 @@ import ca.uhn.fhir.context.FhirContext;
|
|||
import ca.uhn.fhir.jpa.search.lastn.json.CodeJson;
|
||||
import ca.uhn.fhir.jpa.search.lastn.json.ObservationJson;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public interface IElasticsearchSvc {
|
||||
|
||||
/**
|
||||
* Returns identifiers for the last most recent N observations that meet the specified criteria.
|
||||
*
|
||||
* @param theSearchParameterMap SearchParameterMap containing search parameters used for filtering the last N observations. Supported parameters include Subject, Patient, Code, Category and Max (the parameter used to determine N).
|
||||
* @param theFhirContext Current FhirContext.
|
||||
* @param theMaxResultsToFetch The maximum number of results to return for the purpose of paging.
|
||||
* @param theFhirContext Current FhirContext.
|
||||
* @param theMaxResultsToFetch The maximum number of results to return for the purpose of paging.
|
||||
* @return
|
||||
*/
|
||||
List<String> executeLastN(SearchParameterMap theSearchParameterMap, FhirContext theFhirContext, Integer theMaxResultsToFetch);
|
||||
|
||||
/**
|
||||
* Returns index document for a single Observation
|
||||
*
|
||||
* @param theDocumentID Identifier of Observation resource.
|
||||
* @return
|
||||
*/
|
||||
|
@ -48,15 +53,17 @@ public interface IElasticsearchSvc {
|
|||
|
||||
/**
|
||||
* Returns index document for a single Observation Code that either has a coding that matches a specified Code value and system or that has a specified text value.
|
||||
*
|
||||
* @param theCodeSystemHash A hash string constructed from a Code value and Code system used to match to an Observation Code.
|
||||
* @param theText A text value used to match to an Observation Code.
|
||||
* @param theText A text value used to match to an Observation Code.
|
||||
* @return
|
||||
*/
|
||||
CodeJson getObservationCodeDocument(String theCodeSystemHash, String theText);
|
||||
|
||||
/**
|
||||
* Creates or updates index for an Observation Resource.
|
||||
* @param theDocumentId Identifier for Observation resource.
|
||||
*
|
||||
* @param theDocumentId Identifier for Observation resource.
|
||||
* @param theObservationDocument Indexing document for Observation.
|
||||
* @return True if Observation indexed successfully.
|
||||
*/
|
||||
|
@ -64,7 +71,8 @@ public interface IElasticsearchSvc {
|
|||
|
||||
/**
|
||||
* Creates or updates index for an Observation Code.
|
||||
* @param theCodeableConceptID Identifier for Observation resource.
|
||||
*
|
||||
* @param theCodeableConceptID Identifier for Observation resource.
|
||||
* @param theObservationCodeDocument Indexing document for Observation.
|
||||
* @return True if Observation Code indexed successfully.
|
||||
*/
|
||||
|
@ -72,6 +80,7 @@ public interface IElasticsearchSvc {
|
|||
|
||||
/**
|
||||
* Deletes index for an Observation Resource.
|
||||
*
|
||||
* @param theDocumentId Identifier for Observation resource.
|
||||
*/
|
||||
void deleteObservationDocument(String theDocumentId);
|
||||
|
@ -81,4 +90,12 @@ public interface IElasticsearchSvc {
|
|||
*/
|
||||
void close() throws IOException;
|
||||
|
||||
/**
|
||||
* Returns inlined observation resource stored along with index mappings for matched identifiers
|
||||
*
|
||||
* @param thePids
|
||||
* @return Resources list or empty if nothing found
|
||||
*/
|
||||
List<IBaseResource> getObservationResources(Collection<ResourcePersistentId> thePids);
|
||||
|
||||
}
|
||||
|
|
|
@ -75,6 +75,9 @@ public class ObservationJson {
|
|||
@JsonProperty(value = "effectivedtm", required = true)
|
||||
private Date myEffectiveDtm;
|
||||
|
||||
@JsonProperty(value = "resource")
|
||||
private String myResource;
|
||||
|
||||
public ObservationJson() {
|
||||
}
|
||||
|
||||
|
@ -186,4 +189,11 @@ public class ObservationJson {
|
|||
return myIdentifier;
|
||||
}
|
||||
|
||||
public String getResource() {
|
||||
return myResource;
|
||||
}
|
||||
|
||||
public void setResource(String theResource) {
|
||||
myResource = theResource;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,17 +31,17 @@ import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
|
|||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class InterceptorUtil {
|
||||
|
||||
/**
|
||||
* Fires {@link Pointcut#STORAGE_PRESHOW_RESOURCES} interceptor hook, and potentially remove resources
|
||||
* from the resource list
|
||||
* @return
|
||||
*/
|
||||
public static List<IBaseResource> fireStoragePreshowResource(List<IBaseResource> theResources, RequestDetails theRequest, IInterceptorBroadcaster theInterceptorBroadcaster) {
|
||||
List<IBaseResource> retVal = theResources;
|
||||
retVal.removeIf(t -> t == null);
|
||||
retVal.removeIf(Objects::isNull);
|
||||
|
||||
// Interceptor call: STORAGE_PRESHOW_RESOURCE
|
||||
// This can be used to remove results from the search result details before
|
||||
|
@ -55,7 +55,7 @@ public class InterceptorUtil {
|
|||
CompositeInterceptorBroadcaster.doCallHooks(theInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PRESHOW_RESOURCES, params);
|
||||
|
||||
retVal = accessDetails.toList();
|
||||
retVal.removeIf(t -> t == null);
|
||||
retVal.removeIf(Objects::isNull);
|
||||
}
|
||||
|
||||
return retVal;
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test;
|
|||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.test.mock.mockito.SpyBean;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
@ -86,8 +87,10 @@ abstract public class BaseR4SearchLastN extends BaseJpaTest {
|
|||
protected FhirContext myFhirCtx;
|
||||
@Autowired
|
||||
protected PlatformTransactionManager myPlatformTransactionManager;
|
||||
|
||||
@SpyBean
|
||||
@Autowired
|
||||
private ElasticsearchSvcImpl myElasticsearchSvc;
|
||||
protected ElasticsearchSvcImpl myElasticsearchSvc;
|
||||
|
||||
@Override
|
||||
protected FhirContext getContext() {
|
||||
|
@ -143,8 +146,12 @@ abstract public class BaseR4SearchLastN extends BaseJpaTest {
|
|||
createFiveObservationsForPatientCodeCategory(thePatientId, observationCd3, categoryCd3, 5);
|
||||
}
|
||||
|
||||
private void createFiveObservationsForPatientCodeCategory(IIdType thePatientId, String theObservationCode, String theCategoryCode,
|
||||
Integer theTimeOffset) {
|
||||
/**
|
||||
* Create and return observation ids
|
||||
*/
|
||||
protected List<IIdType> createFiveObservationsForPatientCodeCategory(IIdType thePatientId, String theObservationCode, String theCategoryCode,
|
||||
Integer theTimeOffset) {
|
||||
List<IIdType> observerationIds = new ArrayList<>();
|
||||
|
||||
for (int idx = 0; idx < 5; idx++) {
|
||||
Observation obs = new Observation();
|
||||
|
@ -154,12 +161,15 @@ abstract public class BaseR4SearchLastN extends BaseJpaTest {
|
|||
Date effectiveDtm = calculateObservationDateFromOffset(theTimeOffset, idx);
|
||||
obs.setEffective(new DateTimeType(effectiveDtm));
|
||||
obs.getCategoryFirstRep().addCoding().setCode(theCategoryCode).setSystem(categorySystem);
|
||||
String observationId = myObservationDao.create(obs, mockSrd()).getId().toUnqualifiedVersionless().getValue();
|
||||
observationPatientMap.put(observationId, thePatientId.getValue());
|
||||
observationCategoryMap.put(observationId, theCategoryCode);
|
||||
observationCodeMap.put(observationId, theObservationCode);
|
||||
observationEffectiveMap.put(observationId, effectiveDtm);
|
||||
IIdType observationId = myObservationDao.create(obs, mockSrd()).getId().toUnqualifiedVersionless();
|
||||
String observationIdValue = observationId.getValue();
|
||||
observationPatientMap.put(observationIdValue, thePatientId.getValue());
|
||||
observationCategoryMap.put(observationIdValue, theCategoryCode);
|
||||
observationCodeMap.put(observationIdValue, theObservationCode);
|
||||
observationEffectiveMap.put(observationIdValue, effectiveDtm);
|
||||
observerationIds.add(observationId);
|
||||
}
|
||||
return observerationIds;
|
||||
}
|
||||
|
||||
private Date calculateObservationDateFromOffset(Integer theTimeOffset, Integer theObservationIndex) {
|
||||
|
|
|
@ -1,31 +1,44 @@
|
|||
package ca.uhn.fhir.jpa.dao.r4;
|
||||
|
||||
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.search.builder.SearchBuilder;
|
||||
import ca.uhn.fhir.jpa.search.lastn.ElasticsearchSvcImpl;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||
import ca.uhn.fhir.rest.param.ReferenceParam;
|
||||
import ca.uhn.fhir.rest.param.TokenParam;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.r4.model.Observation;
|
||||
import org.hl7.fhir.r4.model.Patient;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.matchesPattern;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(SpringExtension.class)
|
||||
public class FhirResourceDaoR4SearchLastNIT extends BaseR4SearchLastN {
|
||||
|
||||
@AfterEach
|
||||
public void resetMaximumPageSize() {
|
||||
public void reset() {
|
||||
SearchBuilder.setMaxPageSize50ForTest(false);
|
||||
myDaoConfig.setStoreResourceInLuceneIndex(new DaoConfig().isStoreResourceInLuceneIndex());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -87,4 +100,43 @@ public class FhirResourceDaoR4SearchLastNIT extends BaseR4SearchLastN {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLastN_onEnablingStoringObservationWithIndexMapping_shouldSkipLoadingResourceFromDB() throws IOException {
|
||||
// Enable flag
|
||||
myDaoConfig.setStoreResourceInLuceneIndex(true);
|
||||
|
||||
// Create Data
|
||||
Patient pt = new Patient();
|
||||
pt.addName().setFamily("lastn-" + UUID.randomUUID()).addGiven("LastNPat1");
|
||||
IIdType patient1 = myPatientDao.create(pt, mockSrd()).getId().toUnqualifiedVersionless();
|
||||
List<IIdType> observationIds = createFiveObservationsForPatientCodeCategory(patient1, observationCd0, categoryCd0, 15);
|
||||
myElasticsearchSvc.refreshIndex(ElasticsearchSvcImpl.OBSERVATION_INDEX);
|
||||
myElasticsearchSvc.refreshIndex(ElasticsearchSvcImpl.OBSERVATION_CODE_INDEX);
|
||||
|
||||
// Create lastN search params
|
||||
|
||||
SearchParameterMap params = new SearchParameterMap();
|
||||
params.setLoadSynchronous(true);
|
||||
ReferenceParam subjectParam1 = new ReferenceParam("Patient", "", patient1.getValue());
|
||||
params.add(Observation.SP_SUBJECT, buildReferenceAndListParam(subjectParam1));
|
||||
|
||||
params.setLastN(true);
|
||||
params.setLastNMax(100);
|
||||
|
||||
Map<String, String[]> requestParameters = new HashMap<>();
|
||||
when(mySrd.getParameters()).thenReturn(requestParameters);
|
||||
|
||||
List<String> results = toUnqualifiedVersionlessIdValues(myObservationDao.observationsLastN(params, mockSrd(), null));
|
||||
List<ResourcePersistentId> expectedArgumentPids = ResourcePersistentId.fromLongList(
|
||||
observationIds.stream().map(IIdType::getIdPartAsLong).collect(Collectors.toList())
|
||||
);
|
||||
ArgumentCaptor<List<ResourcePersistentId>> actualPids = ArgumentCaptor.forClass(List.class);
|
||||
verify(myElasticsearchSvc, times(1)).getObservationResources(actualPids.capture());
|
||||
assertThat(actualPids.getValue(), is(expectedArgumentPids));
|
||||
|
||||
List<String> expectedObservationList = observationIds.stream()
|
||||
.map(id -> id.toUnqualifiedVersionless().getValue()).collect(Collectors.toList());
|
||||
assertThat(results, is(expectedObservationList));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -288,7 +288,12 @@ public class DaoConfig {
|
|||
private int myInlineResourceTextBelowSize = 0;
|
||||
|
||||
/**
|
||||
* @see FhirValidator#isConcurrentBundleValidation()
|
||||
* @since 5.7.0
|
||||
*/
|
||||
private boolean myStoreResourceInLuceneIndex;
|
||||
|
||||
/**
|
||||
* @see FhirValidator#isConcurrentBundleValidation()
|
||||
* @since 5.7.0
|
||||
*/
|
||||
private boolean myConcurrentBundleValidation;
|
||||
|
@ -2732,7 +2737,33 @@ public class DaoConfig {
|
|||
}
|
||||
|
||||
/**
|
||||
* @see FhirValidator#isConcurrentBundleValidation()
|
||||
* Is storing of Resource in Lucene index enabled?
|
||||
*
|
||||
* @since 5.7.0
|
||||
*/
|
||||
public boolean isStoreResourceInLuceneIndex() {
|
||||
return myStoreResourceInLuceneIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Enable Resource to be stored inline with Lucene index mappings.
|
||||
* This is useful in cases where after performing a search operation the resulting resource identifiers don't have to be
|
||||
* looked up in the persistent storage, but rather the inline stored resource can be used instead.
|
||||
* </p>
|
||||
* <p>
|
||||
* For e.g - Storing Observation resource in lucene index would be useful when performing
|
||||
* <a href="https://www.hl7.org/fhir/observation-operation-lastn.html">$lastn</a> operation.
|
||||
* </p>
|
||||
*
|
||||
* @since 5.7.0
|
||||
*/
|
||||
public void setStoreResourceInLuceneIndex(boolean theStoreResourceInLuceneIndex) {
|
||||
myStoreResourceInLuceneIndex = theStoreResourceInLuceneIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see FhirValidator#isConcurrentBundleValidation()
|
||||
* @since 5.7.0
|
||||
*/
|
||||
public boolean isConcurrentBundleValidation() {
|
||||
|
|
Loading…
Reference in New Issue