Test fixes
This commit is contained in:
parent
7ca27022b7
commit
fbb281f39f
|
@ -242,11 +242,11 @@ public abstract class BaseHapiFhirSystemDao<T extends IBaseBundle, MT> extends B
|
|||
+ " FROM ResourceTable r "
|
||||
+ " LEFT JOIN ResourceHistoryTable h "
|
||||
+ " on r.myVersion = h.myResourceVersion and r.id = h.myResourceId " +
|
||||
// fixme cosmos hack
|
||||
"and r.myPartitionIdValue = h.myPartitionIdValue "
|
||||
// fixme cosmos hack
|
||||
"and r.myPartitionIdValue = h.myPartitionIdValue "
|
||||
+ " left join h.myProvenance p "
|
||||
// fixme cosmos hack
|
||||
+ "on h = p.myResourceHistoryTable and h.myPartitionIdValue = p.myPartitionIdValue"
|
||||
// fixme cosmos hack
|
||||
+ "on h = p.myResourceHistoryTable and h.myPartitionIdValue = p.myPartitionIdValue"
|
||||
+ " WHERE r.myId IN ( :IDS ) ");
|
||||
query.setParameter("IDS", idChunk);
|
||||
|
||||
|
|
|
@ -1657,10 +1657,17 @@ public class SearchBuilder implements ISearchBuilder<JpaPid> {
|
|||
canonicalUriQueryParams.put("uri_identity_hashes", identityHashesForTypes);
|
||||
}
|
||||
|
||||
canonicalUrlQuery.append(" AND r.target_resource_url = rUri.sp_uri )" + " WHERE r.src_path = :src_path AND "
|
||||
+ " r.target_resource_id IS NULL AND "
|
||||
+ " r."
|
||||
+ thePidFieldSqlColumn + " IN (:target_pids) ");
|
||||
canonicalUrlQuery.append(" AND r.target_resource_url = rUri.sp_uri ");
|
||||
// fixme cosmos
|
||||
if (true) {
|
||||
canonicalUrlQuery.append(" AND r.partition_id = rUri.partition_id ");
|
||||
}
|
||||
canonicalUrlQuery.append(")");
|
||||
canonicalUrlQuery.append(" WHERE r.src_path = :src_path AND ");
|
||||
canonicalUrlQuery.append(" r.target_resource_id IS NULL AND ");
|
||||
canonicalUrlQuery.append(" r.");
|
||||
canonicalUrlQuery.append(thePidFieldSqlColumn);
|
||||
canonicalUrlQuery.append(" IN (:target_pids) ");
|
||||
return Pair.of(canonicalUrlQuery.toString(), canonicalUriQueryParams);
|
||||
}
|
||||
|
||||
|
|
|
@ -458,12 +458,24 @@ public class SearchQueryBuilder {
|
|||
DbColumn theFromColumn,
|
||||
DbColumn theToColumn,
|
||||
SelectQuery.JoinType theJoinType) {
|
||||
Join join = new DbJoin(
|
||||
mySpec, theFromTable, theToTable, new DbColumn[] {theFromColumn}, new DbColumn[] {theToColumn});
|
||||
DbColumn[] fromColumns;
|
||||
DbColumn[] toColumns;
|
||||
|
||||
if (true) {
|
||||
// fixme cosmos hack
|
||||
fromColumns = new DbColumn[]{theFromColumn, theFromTable.findColumn("PARTITION_ID")};
|
||||
toColumns = new DbColumn[]{theToColumn, theToTable.findColumn("PARTITION_ID")};
|
||||
} else {
|
||||
fromColumns = new DbColumn[]{theFromColumn};
|
||||
toColumns = new DbColumn[]{theToColumn};
|
||||
}
|
||||
|
||||
Join join = new DbJoin(mySpec, theFromTable, theToTable, fromColumns, toColumns);
|
||||
mySelect.addJoins(theJoinType, join);
|
||||
}
|
||||
|
||||
public void addJoin(DbTable theFromTable, DbTable theToTable, DbColumn theFromColumn, DbColumn theToColumn) {
|
||||
// FIXME: who calls this and not the column above?
|
||||
Join join = new DbJoin(
|
||||
mySpec, theFromTable, theToTable, new DbColumn[] {theFromColumn}, new DbColumn[] {theToColumn});
|
||||
mySelect.addJoins(SelectQuery.JoinType.INNER, join);
|
||||
|
|
|
@ -0,0 +1,262 @@
|
|||
package ca.uhn.fhir.jpa.cosmos;
|
||||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoObservation;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoPatient;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
|
||||
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
|
||||
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
|
||||
import ca.uhn.fhir.jpa.entity.PartitionEntity;
|
||||
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
|
||||
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.test.BaseJpaTest;
|
||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||
import ca.uhn.fhir.rest.param.ReferenceParam;
|
||||
import ca.uhn.fhir.rest.param.TokenParam;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
||||
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
|
||||
import ca.uhn.fhir.util.BundleBuilder;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.hl7.fhir.instance.model.api.IAnyResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.r4.model.Bundle;
|
||||
import org.hl7.fhir.r4.model.Enumerations;
|
||||
import org.hl7.fhir.r4.model.IdType;
|
||||
import org.hl7.fhir.r4.model.Meta;
|
||||
import org.hl7.fhir.r4.model.Observation;
|
||||
import org.hl7.fhir.r4.model.Patient;
|
||||
import org.hl7.fhir.r4.model.Reference;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
@ExtendWith(SpringExtension.class)
|
||||
@ContextConfiguration(classes = {
|
||||
TestCosmosR4Config.class
|
||||
})
|
||||
public class CosmosDbTest extends BaseJpaTest {
|
||||
public static final String PATIENT_P0 = "Patient/p0";
|
||||
public static final String OBSERVATION_O0 = "Observation/O0";
|
||||
@Autowired
|
||||
private PlatformTransactionManager myTxManager;
|
||||
@Autowired
|
||||
private FhirContext myFhirContext;
|
||||
@Autowired
|
||||
private PartitionSettings myPartitionSettings;
|
||||
@Autowired
|
||||
private IFhirResourceDaoPatient<Patient> myPatientDao;
|
||||
@Autowired
|
||||
private IFhirResourceDaoObservation<Observation> myObservationDao;
|
||||
private CosmosPartitionInterceptor myPartitionInterceptor = new CosmosPartitionInterceptor();
|
||||
@Autowired
|
||||
private IFhirSystemDao<Bundle, Meta> mySystemDao;
|
||||
@Autowired
|
||||
private IResourceReindexingSvc myResourceReindexingSvc;
|
||||
@Autowired
|
||||
private ISearchCoordinatorSvc mySearchCoordinatorSvc;
|
||||
@Autowired
|
||||
private ISearchParamRegistry mySearchParamRegistry;
|
||||
@Autowired
|
||||
private IBulkDataExportJobSchedulingHelper myBulkDataScheduleHelper;
|
||||
|
||||
@Override
|
||||
protected FhirContext getFhirContext() {
|
||||
return myFhirContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PlatformTransactionManager getTxManager() {
|
||||
return myTxManager;
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void beforeEach() {
|
||||
myPartitionSettings.setPartitioningEnabled(true);
|
||||
myPartitionSettings.setDefaultPartitionId(-1);
|
||||
|
||||
myInterceptorRegistry.registerInterceptor(myPartitionInterceptor);
|
||||
|
||||
createPartitionIfItDoesntExist("P1", 1);
|
||||
createPartitionIfItDoesntExist("P2", 2);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void afterEach() {
|
||||
purgeDatabase();
|
||||
|
||||
myPartitionInterceptor.assertNoRemainingIds();
|
||||
myInterceptorRegistry.unregisterInterceptor(myPartitionInterceptor);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCreateWithClientAssignedIdAndRead() {
|
||||
// Create resource
|
||||
addCreatePartition1();
|
||||
Patient patient = new Patient();
|
||||
patient.setId(PATIENT_P0);
|
||||
patient.addIdentifier().setSystem("http://foo").setValue("123");
|
||||
patient.setGender(Enumerations.AdministrativeGender.MALE);
|
||||
myPatientDao.update(patient, mySrd);
|
||||
|
||||
// Read back
|
||||
addReadPartition(1);
|
||||
patient = myPatientDao.read(new IdType(PATIENT_P0), mySrd);
|
||||
assertEquals("123", patient.getIdentifierFirstRep().getValue());
|
||||
|
||||
// Try to read from wrong partition
|
||||
addReadPartition(2);
|
||||
try {
|
||||
myPatientDao.read(new IdType(PATIENT_P0), mySrd);
|
||||
fail();
|
||||
} catch (ResourceNotFoundException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChainedSearch() {
|
||||
// Create resource
|
||||
createPatientAndObservation();
|
||||
|
||||
// Test
|
||||
addReadPartition(1);
|
||||
SearchParameterMap map = new SearchParameterMap();
|
||||
map.add(Observation.SP_SUBJECT, new ReferenceParam("identifier", "http://foo|123"));
|
||||
IBundleProvider outcome = myObservationDao.search(map, mySrd);
|
||||
assertThat(toUnqualifiedVersionlessIdValues(outcome), contains(OBSERVATION_O0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncludes() {
|
||||
// Setup
|
||||
createPatientAndObservation();
|
||||
|
||||
// Test
|
||||
addReadPartition(1);
|
||||
SearchParameterMap map = SearchParameterMap
|
||||
.newSynchronous()
|
||||
.add(IAnyResource.SP_RES_ID, new TokenParam(OBSERVATION_O0))
|
||||
.addInclude(Observation.INCLUDE_PATIENT);
|
||||
IBundleProvider results = myObservationDao.search(map, mySrd);
|
||||
|
||||
// Verify
|
||||
assertThat(toUnqualifiedVersionlessIdValues(results), contains(OBSERVATION_O0, PATIENT_P0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRevIncludes() {
|
||||
// Setup
|
||||
createPatientAndObservation();
|
||||
|
||||
// Test
|
||||
addReadPartition(1);
|
||||
SearchParameterMap map = SearchParameterMap
|
||||
.newSynchronous()
|
||||
.add(IAnyResource.SP_RES_ID, new TokenParam(PATIENT_P0))
|
||||
.addRevInclude(Observation.INCLUDE_PATIENT);
|
||||
IBundleProvider results = myPatientDao.search(map, mySrd);
|
||||
|
||||
// Verify
|
||||
assertThat(toUnqualifiedVersionlessIdValues(results), contains(PATIENT_P0, OBSERVATION_O0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFhirTransaction() {
|
||||
|
||||
addCreatePartition1();
|
||||
addCreatePartition1();
|
||||
addCreatePartition1();
|
||||
addCreatePartition1();
|
||||
Bundle requestBundle = createTransactionWithConditionalCreatePatientAndObservation();
|
||||
Bundle responseBundle = mySystemDao.transaction(mySrd, requestBundle);
|
||||
List<String> ids = toUnqualifiedVersionlessIdValues(responseBundle);
|
||||
|
||||
addCreatePartition1();
|
||||
addCreatePartition1();
|
||||
addCreatePartition1();
|
||||
addCreatePartition1();
|
||||
requestBundle = createTransactionWithConditionalCreatePatientAndObservation();
|
||||
responseBundle = mySystemDao.transaction(mySrd, requestBundle);
|
||||
List<String> ids2 = toUnqualifiedVersionlessIdValues(responseBundle);
|
||||
|
||||
assertEquals(ids, ids2);
|
||||
}
|
||||
|
||||
private Bundle createTransactionWithConditionalCreatePatientAndObservation() {
|
||||
BundleBuilder bb = new BundleBuilder(myFhirContext);
|
||||
|
||||
Patient patient = new Patient();
|
||||
patient.setId(IdType.newRandomUuid());
|
||||
patient.setActive(true);
|
||||
bb.addTransactionCreateEntry(patient, "?identifier=http://foo|111");
|
||||
|
||||
Observation obs = new Observation();
|
||||
obs.setId(IdType.newRandomUuid());
|
||||
obs.setSubject(new Reference(patient.getIdElement()));
|
||||
obs.addIdentifier().setSystem("http://foo").setValue("222");
|
||||
bb.addTransactionCreateEntry(obs, "?identifier=http://foo|222");
|
||||
|
||||
Bundle requestBundle = bb.getBundleTyped();
|
||||
return requestBundle;
|
||||
}
|
||||
|
||||
private void createPatientAndObservation() {
|
||||
addCreatePartition1();
|
||||
Patient patient = new Patient();
|
||||
patient.setId(PATIENT_P0);
|
||||
patient.addIdentifier().setSystem("http://foo").setValue("123");
|
||||
patient.setGender(Enumerations.AdministrativeGender.MALE);
|
||||
myPatientDao.update(patient, mySrd).getId().toUnqualifiedVersionless();
|
||||
|
||||
addCreatePartition1();
|
||||
Observation obs = new Observation();
|
||||
obs.setId(OBSERVATION_O0);
|
||||
obs.setSubject(new Reference(PATIENT_P0));
|
||||
myObservationDao.update(obs, mySrd).getId().toUnqualifiedVersionless();
|
||||
}
|
||||
|
||||
private void addReadPartition(int partitionId) {
|
||||
myPartitionInterceptor.addReadPartition(RequestPartitionId.fromPartitionId(partitionId));
|
||||
}
|
||||
|
||||
private void addCreatePartition1() {
|
||||
myPartitionInterceptor.addCreatePartition(RequestPartitionId.fromPartitionId(1));
|
||||
}
|
||||
|
||||
private void createPartitionIfItDoesntExist(String partitionName, int partitionId) {
|
||||
PartitionEntity partition = new PartitionEntity();
|
||||
partition.setName(partitionName);
|
||||
partition.setId(partitionId);
|
||||
try {
|
||||
PartitionEntity found = myPartitionConfigSvc.getPartitionById(partition.getId());
|
||||
assertEquals(partition.getName(), found.getName());
|
||||
} catch (ResourceNotFoundException e) {
|
||||
myPartitionConfigSvc.createPartition(partition, mySrd);
|
||||
}
|
||||
}
|
||||
|
||||
private void purgeDatabase() {
|
||||
myPartitionInterceptor.assertNoRemainingIds();
|
||||
myPartitionInterceptor.addReadPartition(RequestPartitionId.allPartitions());
|
||||
purgeDatabase(myStorageSettings, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry, myBulkDataScheduleHelper);
|
||||
myPartitionInterceptor.assertNoRemainingIds();
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
package ca.uhn.fhir.jpa.cosmos;
|
||||
|
||||
import ca.uhn.fhir.interceptor.api.Hook;
|
||||
import ca.uhn.fhir.interceptor.api.Interceptor;
|
||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
|
||||
import com.helger.commons.lang.StackTraceHelper;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@Interceptor
|
||||
public class CosmosPartitionInterceptor {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(CosmosPartitionInterceptor.class);
|
||||
|
||||
private final List<RequestPartitionId> myReadRequestPartitionIds = new ArrayList<>();
|
||||
private final List<RequestPartitionId> myCreateRequestPartitionIds = new ArrayList<>();
|
||||
|
||||
public void addReadPartition(RequestPartitionId theRequestPartitionId) {
|
||||
myReadRequestPartitionIds.add(theRequestPartitionId);
|
||||
ourLog.info("Adding partition {} for read (not have {})", theRequestPartitionId, myReadRequestPartitionIds.size());
|
||||
}
|
||||
|
||||
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ)
|
||||
public RequestPartitionId partitionIdentifyRead(ServletRequestDetails theRequestDetails) {
|
||||
|
||||
// Just to be nice, figure out the first line in the stack that isn't a part of the
|
||||
// partitioning or interceptor infrastructure, just so it's obvious who is asking
|
||||
// for a partition ID
|
||||
String stack;
|
||||
try {
|
||||
throw new Exception();
|
||||
} catch (Exception e) {
|
||||
stack = StackTraceHelper.getStackAsString(e);
|
||||
stack = Arrays.stream(stack.split("\\n"))
|
||||
.filter(t -> t.contains("ca.uhn.fhir"))
|
||||
.filter(t -> !t.toLowerCase().contains("interceptor"))
|
||||
.filter(t -> !t.toLowerCase().contains("partitionhelper"))
|
||||
.findFirst()
|
||||
.orElse("UNKNOWN");
|
||||
}
|
||||
|
||||
RequestPartitionId retVal = myReadRequestPartitionIds.remove(0);
|
||||
ourLog.info("Returning partition {} for read at: {}", retVal, stack);
|
||||
return retVal;
|
||||
}
|
||||
|
||||
public void addCreatePartition(RequestPartitionId theRequestPartitionId) {
|
||||
myCreateRequestPartitionIds.add(theRequestPartitionId);
|
||||
}
|
||||
|
||||
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE)
|
||||
public RequestPartitionId PartitionIdentifyCreate(IBaseResource theResource, ServletRequestDetails theRequestDetails) {
|
||||
assertNotNull(theResource);
|
||||
assertTrue(!myCreateRequestPartitionIds.isEmpty(), "No create partitions left in interceptor");
|
||||
RequestPartitionId retVal = myCreateRequestPartitionIds.remove(0);
|
||||
ourLog.debug("Returning partition [{}] for create of resource {} with date {}", retVal, theResource, retVal.getPartitionDate());
|
||||
return retVal;
|
||||
}
|
||||
|
||||
public void assertNoRemainingIds() {
|
||||
assertEquals(0, myReadRequestPartitionIds.size(), () -> "Found " + myReadRequestPartitionIds.size() + " READ partitions remaining in interceptor");
|
||||
assertEquals(0, myCreateRequestPartitionIds.size(), () -> "Still have " + myCreateRequestPartitionIds.size() + " CREATE partitions remaining in interceptor");
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,323 @@
|
|||
/*-
|
||||
* #%L
|
||||
* HAPI FHIR JPA Server Test Utilities
|
||||
* %%
|
||||
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
|
||||
* %%
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* #L%
|
||||
*/
|
||||
package ca.uhn.fhir.jpa.cosmos;
|
||||
|
||||
import ca.uhn.fhir.batch2.jobs.config.Batch2JobsConfig;
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
|
||||
import ca.uhn.fhir.jpa.batch2.JpaBatch2Config;
|
||||
import ca.uhn.fhir.jpa.binary.api.IBinaryStorageSvc;
|
||||
import ca.uhn.fhir.jpa.binstore.MemoryBinaryStorageSvcImpl;
|
||||
import ca.uhn.fhir.jpa.config.PackageLoaderConfig;
|
||||
import ca.uhn.fhir.jpa.config.r4.JpaR4Config;
|
||||
import ca.uhn.fhir.jpa.config.util.HapiEntityManagerFactoryUtil;
|
||||
import ca.uhn.fhir.jpa.dao.TestDaoSearch;
|
||||
import ca.uhn.fhir.jpa.model.dialect.HapiFhirH2Dialect;
|
||||
import ca.uhn.fhir.jpa.model.dialect.HapiFhirPostgresDialect;
|
||||
import ca.uhn.fhir.jpa.searchparam.config.NicknameServiceConfig;
|
||||
import ca.uhn.fhir.jpa.test.config.BlockLargeNumbersOfParamsListener;
|
||||
import ca.uhn.fhir.jpa.test.config.ConnectionWrapper;
|
||||
import ca.uhn.fhir.jpa.test.config.MandatoryTransactionListener;
|
||||
import ca.uhn.fhir.jpa.test.config.TestHSearchAddInConfig;
|
||||
import ca.uhn.fhir.jpa.test.config.TestHapiJpaConfig;
|
||||
import ca.uhn.fhir.jpa.test.config.TestJPAConfig;
|
||||
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
|
||||
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
|
||||
import ca.uhn.fhir.system.HapiTestSystemProperties;
|
||||
import ca.uhn.fhir.validation.ResultSeverityEnum;
|
||||
import net.ttddyy.dsproxy.listener.SingleQueryCountHolder;
|
||||
import net.ttddyy.dsproxy.listener.logging.SLF4JLogLevel;
|
||||
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
|
||||
import org.apache.commons.dbcp2.BasicDataSource;
|
||||
import org.hl7.fhir.common.hapi.validation.validator.FhirInstanceValidator;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.ThreadInfo;
|
||||
import java.lang.management.ThreadMXBean;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static ca.uhn.fhir.jpa.test.config.TestR5Config.SELECT_QUERY_INCLUSION_CRITERIA_EXCLUDING_SEQUENCE_QUERIES;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
@Configuration
|
||||
@Import({
|
||||
JpaR4Config.class,
|
||||
PackageLoaderConfig.class,
|
||||
TestHapiJpaConfig.class,
|
||||
TestJPAConfig.class,
|
||||
TestHSearchAddInConfig.DefaultLuceneHeap.class,
|
||||
JpaBatch2Config.class,
|
||||
Batch2JobsConfig.class,
|
||||
NicknameServiceConfig.class,
|
||||
TestDaoSearch.Config.class
|
||||
})
|
||||
public class TestCosmosR4Config {
|
||||
|
||||
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestCosmosR4Config.class);
|
||||
public static Integer ourMaxThreads;
|
||||
private final AtomicInteger myBorrowedConnectionCount = new AtomicInteger(0);
|
||||
private final AtomicInteger myReturnedConnectionCount = new AtomicInteger(0);
|
||||
|
||||
static {
|
||||
/*
|
||||
* We use a randomized number of maximum threads in order to try
|
||||
* and catch any potential deadlocks caused by database connection
|
||||
* starvation
|
||||
*/
|
||||
if (ourMaxThreads == null) {
|
||||
ourMaxThreads = (int) (Math.random() * 6.0) + 3;
|
||||
|
||||
if (HapiTestSystemProperties.isSingleDbConnectionEnabled()) {
|
||||
ourMaxThreads = 1;
|
||||
}
|
||||
if (HapiTestSystemProperties.isUnlimitedDbConnectionsEnabled()) {
|
||||
ourMaxThreads = 100;
|
||||
}
|
||||
}
|
||||
ourLog.warn("ourMaxThreads={}", ourMaxThreads);
|
||||
}
|
||||
|
||||
private Map<Connection, Exception> myConnectionRequestStackTraces = Collections.synchronizedMap(new LinkedHashMap<>());
|
||||
|
||||
@Autowired
|
||||
TestHSearchAddInConfig.IHSearchConfigurer hibernateSearchConfigurer;
|
||||
private boolean myHaveDumpedThreads;
|
||||
@Autowired
|
||||
private JpaStorageSettings myStorageSettings;
|
||||
private Properties myDbProps;
|
||||
|
||||
@Bean
|
||||
public CircularQueueCaptureQueriesListener captureQueriesListener() {
|
||||
return new CircularQueueCaptureQueriesListener()
|
||||
.setSelectQueryInclusionCriteria(SELECT_QUERY_INCLUSION_CRITERIA_EXCLUDING_SEQUENCE_QUERIES);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DataSource dataSource() {
|
||||
BasicDataSource retVal = new BasicDataSource() {
|
||||
|
||||
@Override
|
||||
public Connection getConnection() {
|
||||
ConnectionWrapper retVal;
|
||||
try {
|
||||
retVal = new ConnectionWrapper(super.getConnection());
|
||||
} catch (Exception e) {
|
||||
ourLog.error("Exceeded maximum wait for connection (" + ourMaxThreads + " max)", e);
|
||||
ourLog.info("Have {} outstanding - {} borrowed {} returned", (myBorrowedConnectionCount.get() - myReturnedConnectionCount.get()), myBorrowedConnectionCount.get(), myReturnedConnectionCount.get());
|
||||
logGetConnectionStackTrace();
|
||||
fail("Exceeded maximum wait for connection (" + ourMaxThreads + " max): " + e);
|
||||
retVal = null;
|
||||
}
|
||||
|
||||
try {
|
||||
throw new Exception();
|
||||
} catch (Exception e) {
|
||||
myConnectionRequestStackTraces.put(retVal, e);
|
||||
}
|
||||
|
||||
myBorrowedConnectionCount.incrementAndGet();
|
||||
ConnectionWrapper finalRetVal = retVal;
|
||||
return new ConnectionWrapper(finalRetVal){
|
||||
@Override
|
||||
public void close() throws SQLException {
|
||||
myConnectionRequestStackTraces.remove(finalRetVal);
|
||||
myReturnedConnectionCount.incrementAndGet();
|
||||
super.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void logGetConnectionStackTrace() {
|
||||
StringBuilder b = new StringBuilder();
|
||||
ArrayList<Exception> stackTraces = new ArrayList<>(myConnectionRequestStackTraces.values());
|
||||
|
||||
for (int i = 0; i < stackTraces.size(); i++) {
|
||||
Exception nextStack = stackTraces.get(i);
|
||||
b.append("\nPrevious request stack trace ");
|
||||
b.append(i);
|
||||
b.append(":");
|
||||
for (StackTraceElement next : nextStack.getStackTrace()) {
|
||||
b.append("\n ");
|
||||
b.append(next.getClassName());
|
||||
b.append(".");
|
||||
b.append(next.getMethodName());
|
||||
b.append("(");
|
||||
b.append(next.getFileName());
|
||||
b.append(":");
|
||||
b.append(next.getLineNumber());
|
||||
b.append(")");
|
||||
}
|
||||
b.append("\n");
|
||||
}
|
||||
ourLog.info(b.toString());
|
||||
|
||||
if (!myHaveDumpedThreads) {
|
||||
ourLog.info("Thread dump:" + crunchifyGenerateThreadDump());
|
||||
myHaveDumpedThreads = true;
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
setConnectionProperties(retVal);
|
||||
|
||||
SLF4JLogLevel level = SLF4JLogLevel.INFO;
|
||||
DataSource dataSource = ProxyDataSourceBuilder
|
||||
.create(retVal)
|
||||
// .logQueryBySlf4j(level)
|
||||
.logSlowQueryBySlf4j(10, TimeUnit.SECONDS, level)
|
||||
.beforeQuery(new BlockLargeNumbersOfParamsListener())
|
||||
.beforeQuery(getMandatoryTransactionListener())
|
||||
.afterQuery(captureQueriesListener())
|
||||
.afterQuery(new CurrentThreadCaptureQueriesListener())
|
||||
.countQuery(singleQueryCountHolder())
|
||||
.afterMethod(captureQueriesListener())
|
||||
.build();
|
||||
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
|
||||
public void setConnectionProperties(BasicDataSource theDataSource) {
|
||||
|
||||
if (myDbProps == null) {
|
||||
myDbProps = new Properties();
|
||||
try {
|
||||
myDbProps.load(new FileReader("/tmp/cosmos.properties", StandardCharsets.UTF_8));
|
||||
} catch (IOException e) {
|
||||
myDbProps = null;
|
||||
throw new InternalErrorException(e);
|
||||
}
|
||||
}
|
||||
|
||||
theDataSource.setDriver(new org.postgresql.Driver());
|
||||
theDataSource.setUrl(myDbProps.getProperty("url"));
|
||||
theDataSource.setMaxWaitMillis(30000);
|
||||
theDataSource.setUsername(myDbProps.getProperty("user"));
|
||||
theDataSource.setPassword(myDbProps.getProperty("pass"));
|
||||
theDataSource.setMaxTotal(ourMaxThreads);
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public SingleQueryCountHolder singleQueryCountHolder() {
|
||||
return new SingleQueryCountHolder();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ProxyDataSourceBuilder.SingleQueryExecution getMandatoryTransactionListener() {
|
||||
return new MandatoryTransactionListener();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public LocalContainerEntityManagerFactoryBean entityManagerFactory(ConfigurableListableBeanFactory theConfigurableListableBeanFactory, FhirContext theFhirContext, JpaStorageSettings theStorageSettings) {
|
||||
LocalContainerEntityManagerFactoryBean retVal = HapiEntityManagerFactoryUtil.newEntityManagerFactory(theConfigurableListableBeanFactory, theFhirContext, theStorageSettings);
|
||||
retVal.setPersistenceUnitName("PU_HapiFhirJpaR4");
|
||||
retVal.setDataSource(dataSource());
|
||||
retVal.setJpaProperties(jpaProperties());
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private Properties jpaProperties() {
|
||||
Properties extraProperties = new Properties();
|
||||
extraProperties.put("hibernate.format_sql", "false");
|
||||
extraProperties.put("hibernate.show_sql", "false");
|
||||
extraProperties.put("hibernate.hbm2ddl.auto", "none");
|
||||
extraProperties.put("hibernate.dialect", getHibernateDialect());
|
||||
|
||||
hibernateSearchConfigurer.apply(extraProperties);
|
||||
|
||||
ourLog.info("jpaProperties: {}", extraProperties);
|
||||
|
||||
return extraProperties;
|
||||
}
|
||||
|
||||
public String getHibernateDialect() {
|
||||
return HapiFhirPostgresDialect.class.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Bean which validates incoming requests
|
||||
*/
|
||||
@Bean
|
||||
@Lazy
|
||||
public RequestValidatingInterceptor requestValidatingInterceptor(FhirInstanceValidator theFhirInstanceValidator) {
|
||||
RequestValidatingInterceptor requestValidator = new RequestValidatingInterceptor();
|
||||
requestValidator.setFailOnSeverity(ResultSeverityEnum.ERROR);
|
||||
requestValidator.setAddResponseHeaderOnSeverity(null);
|
||||
requestValidator.setAddResponseOutcomeHeaderOnSeverity(ResultSeverityEnum.INFORMATION);
|
||||
requestValidator.addValidatorModule(theFhirInstanceValidator);
|
||||
|
||||
return requestValidator;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IBinaryStorageSvc binaryStorage() {
|
||||
return new MemoryBinaryStorageSvcImpl();
|
||||
}
|
||||
|
||||
public static String crunchifyGenerateThreadDump() {
|
||||
final StringBuilder dump = new StringBuilder();
|
||||
final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
|
||||
final ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
|
||||
for (ThreadInfo threadInfo : threadInfos) {
|
||||
dump.append('"');
|
||||
dump.append(threadInfo.getThreadName());
|
||||
dump.append("\" ");
|
||||
final Thread.State state = threadInfo.getThreadState();
|
||||
dump.append("\n java.lang.Thread.State: ");
|
||||
dump.append(state);
|
||||
final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
|
||||
for (final StackTraceElement stackTraceElement : stackTraceElements) {
|
||||
dump.append("\n at ");
|
||||
dump.append(stackTraceElement);
|
||||
}
|
||||
dump.append("\n\n");
|
||||
}
|
||||
return dump.toString();
|
||||
}
|
||||
|
||||
public static int getMaxThreads() {
|
||||
return ourMaxThreads;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,3 +1,22 @@
|
|||
/*-
|
||||
* #%L
|
||||
* HAPI FHIR - Clinical Reasoning
|
||||
* %%
|
||||
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
|
||||
* %%
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* #L%
|
||||
*/
|
||||
package ca.uhn.fhir.cr.r4;
|
||||
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
|
|
|
@ -1,3 +1,22 @@
|
|||
/*-
|
||||
* #%L
|
||||
* HAPI FHIR - Clinical Reasoning
|
||||
* %%
|
||||
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
|
||||
* %%
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* #L%
|
||||
*/
|
||||
package ca.uhn.fhir.cr.r4;
|
||||
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
|
|
|
@ -1,3 +1,22 @@
|
|||
/*-
|
||||
* #%L
|
||||
* HAPI FHIR - Clinical Reasoning
|
||||
* %%
|
||||
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
|
||||
* %%
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* #L%
|
||||
*/
|
||||
package ca.uhn.fhir.cr.r4.measure;
|
||||
|
||||
import ca.uhn.fhir.cr.r4.ICollectDataServiceFactory;
|
||||
|
|
|
@ -1,3 +1,22 @@
|
|||
/*-
|
||||
* #%L
|
||||
* HAPI FHIR - Clinical Reasoning
|
||||
* %%
|
||||
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
|
||||
* %%
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* #L%
|
||||
*/
|
||||
package ca.uhn.fhir.cr.r4.measure;
|
||||
|
||||
import ca.uhn.fhir.cr.r4.IDataRequirementsServiceFactory;
|
||||
|
|
Loading…
Reference in New Issue