Allow no read interceptor for partition (#1851)

* Allow no read interceptor for partition

* Add changelog

* Test fix
This commit is contained in:
James Agnew 2020-05-19 17:06:56 -04:00 committed by GitHub
parent f3c023bb6f
commit c9c90b327c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 105 additions and 28 deletions

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 1851
title: "In HAPI FHIR 5.0.0, partitioned JPA servers were introduced. The documentation claimed that an interceptor implementing
the STORAGE_PARTITION_IDENTIFY_READ was optional, but the server incorrectly made this mandatory. This has been
corrected."

View File

@ -42,6 +42,7 @@ import java.util.HashSet;
import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.doCallHooks;
import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.doCallHooksAndReturnObject;
import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.hasHooks;
public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
@ -74,7 +75,10 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
}
/**
* Invoke the <code>STORAGE_PARTITION_IDENTIFY_READ</code> interceptor pointcut to determine the tenant for a read request
* Invoke the {@link Pointcut#STORAGE_PARTITION_IDENTIFY_READ} interceptor pointcut to determine the tenant for a read request.
*
* If no interceptors are registered with a hook for {@link Pointcut#STORAGE_PARTITION_IDENTIFY_READ}, return
* {@link RequestPartitionId#allPartitions()} instead.
*/
@Nonnull
@Override
@ -88,10 +92,14 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
}
// Interceptor call: STORAGE_PARTITION_IDENTIFY_READ
if (hasHooks(Pointcut.STORAGE_PARTITION_IDENTIFY_READ, myInterceptorBroadcaster, theRequest)) {
HookParams params = new HookParams()
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest);
requestPartitionId = (RequestPartitionId) doCallHooksAndReturnObject(myInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PARTITION_IDENTIFY_READ, params);
} else {
requestPartitionId = RequestPartitionId.allPartitions();
}
validatePartition(requestPartitionId, theResourceType, Pointcut.STORAGE_PARTITION_IDENTIFY_READ);
@ -102,7 +110,7 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
}
/**
* Invoke the <code>STORAGE_PARTITION_IDENTIFY_CREATE</code> interceptor pointcut to determine the tenant for a read request
* Invoke the {@link Pointcut#STORAGE_PARTITION_IDENTIFY_CREATE} interceptor pointcut to determine the tenant for a create request.
*/
@Nonnull
@Override
@ -179,7 +187,7 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
}
private void validatePartition(@Nullable RequestPartitionId theRequestPartitionId, @Nonnull String theResourceName, Pointcut thePointcut) {
private void validatePartition(@Nonnull RequestPartitionId theRequestPartitionId, @Nonnull String theResourceName, Pointcut thePointcut) {
Validate.notNull(theRequestPartitionId, "Interceptor did not provide a value for pointcut: %s", thePointcut);
if (theRequestPartitionId.getPartitionId() != null) {

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.model.entity.SearchParamPresent;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParamConstants;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.SqlQuery;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.DateAndListParam;
@ -71,6 +72,7 @@ import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.util.TestUtil.sleepAtLeast;
import static org.apache.commons.lang3.StringUtils.countMatches;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
@ -89,7 +91,7 @@ public class PartitioningR4Test extends BaseJpaR4SystemTest implements ITestData
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(PartitioningR4Test.class);
private MyInterceptor myPartitionInterceptor;
private MyReadWriteInterceptor myPartitionInterceptor;
private LocalDate myPartitionDate;
private LocalDate myPartitionDate2;
private int myPartitionId;
@ -106,7 +108,7 @@ public class PartitioningR4Test extends BaseJpaR4SystemTest implements ITestData
myPartitionSettings.setPartitioningEnabled(new PartitionSettings().isPartitioningEnabled());
myPartitionSettings.setAllowReferencesAcrossPartitions(new PartitionSettings().getAllowReferencesAcrossPartitions());
myInterceptorRegistry.unregisterInterceptorsIf(t -> t instanceof MyInterceptor);
myInterceptorRegistry.unregisterInterceptorsIf(t -> t instanceof MyReadWriteInterceptor);
myInterceptor = null;
if (myHaveDroppedForcedIdUniqueConstraint) {
@ -136,7 +138,7 @@ public class PartitioningR4Test extends BaseJpaR4SystemTest implements ITestData
myPartitionId = 1;
myPartitionId2 = 2;
myPartitionInterceptor = new MyInterceptor();
myPartitionInterceptor = new MyReadWriteInterceptor();
myInterceptorRegistry.registerInterceptor(myPartitionInterceptor);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(1).setName("PART-1"));
@ -166,6 +168,7 @@ public class PartitioningR4Test extends BaseJpaR4SystemTest implements ITestData
@Test
public void testCreate_CrossPartitionReference_ByPid_Allowed() {
myPartitionSettings.setAllowReferencesAcrossPartitions(PartitionSettings.CrossPartitionReferenceMode.ALLOWED_UNQUALIFIED);
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.DISABLED);
// Create patient in partition 1
addCreatePartition(myPartitionId, myPartitionDate);
@ -177,8 +180,18 @@ public class PartitioningR4Test extends BaseJpaR4SystemTest implements ITestData
addCreatePartition(myPartitionId2, myPartitionDate2);
Observation obs = new Observation();
obs.getSubject().setReference(patientId.getValue());
myCaptureQueriesListener.clear();
IIdType obsId = myObservationDao.create(obs).getId().toUnqualifiedVersionless();
List<SqlQuery> selectQueries = myCaptureQueriesListener.getSelectQueriesForCurrentThread();
assertEquals(2, selectQueries.size());
// Look up the partition
assertThat(selectQueries.get(0).getSql(true,false).toLowerCase(), containsString(" from hfj_partition "));
// Look up the referenced subject/patient
assertThat(selectQueries.get(1).getSql(true,false).toLowerCase(), containsString(" from hfj_resource "));
assertEquals(0, StringUtils.countMatches(selectQueries.get(1).getSql(true,false).toLowerCase(), "partition"));
runInTransaction(() -> {
List<ResourceLink> resLinks = myResourceLinkDao.findAll();
ourLog.info("Resource links:\n{}", resLinks.toString());
@ -691,7 +704,7 @@ public class PartitioningR4Test extends BaseJpaR4SystemTest implements ITestData
@Test
public void testRead_PidId_AllPartitions() {
IIdType patientId1 = createPatient(withPartition(1), withActiveTrue());
IIdType patientId2 = createPatient(withPartition(2) , withActiveTrue());
IIdType patientId2 = createPatient(withPartition(2), withActiveTrue());
{
addReadAllPartitions();
@ -764,6 +777,44 @@ public class PartitioningR4Test extends BaseJpaR4SystemTest implements ITestData
}
}
@Test
public void testRead_PidId_AllPartitionsBecauseNoReadInterceptor() {
IIdType patientId1 = createPatient(withPartition(1), withActiveTrue());
IIdType patientId2 = createPatient(withPartition(2), withActiveTrue());
myInterceptorRegistry.unregisterInterceptor(myPartitionInterceptor);
MyWriteInterceptor writeInterceptor = new MyWriteInterceptor();
myInterceptorRegistry.registerInterceptor(writeInterceptor);
try {
{
myCaptureQueriesListener.clear();
IdType gotId1 = myPatientDao.read(patientId1, mySrd).getIdElement().toUnqualifiedVersionless();
assertEquals(patientId1, gotId1);
String searchSql = myCaptureQueriesListener.getSelectQueriesForCurrentThread().get(0).getSql(true, true);
ourLog.info("Search SQL:\n{}", searchSql);
// Only the read columns should be used, no criteria use partition
assertEquals(2, StringUtils.countMatches(searchSql, "PARTITION_ID as "));
assertEquals(2, StringUtils.countMatches(searchSql, "PARTITION_ID"));
}
{
IdType gotId2 = myPatientDao.read(patientId2, mySrd).getIdElement().toUnqualifiedVersionless();
assertEquals(patientId2, gotId2);
String searchSql = myCaptureQueriesListener.getSelectQueriesForCurrentThread().get(0).getSql(true, true);
ourLog.info("Search SQL:\n{}", searchSql);
// Only the read columns should be used, no criteria use partition
assertEquals(2, StringUtils.countMatches(searchSql, "PARTITION_ID as "));
assertEquals(2, StringUtils.countMatches(searchSql, "PARTITION_ID"));
}
} finally {
myInterceptorRegistry.unregisterInterceptor(writeInterceptor);
}
}
@Test
public void testRead_PidId_DefaultPartition() {
IIdType patientIdNull = createPatient(withPartition(null), withActiveTrue());
@ -1261,7 +1312,7 @@ public class PartitioningR4Test extends BaseJpaR4SystemTest implements ITestData
// Date param
ourLog.info("Date indexes:\n * {}", myResourceIndexedSearchParamDateDao.findAll().stream().map(t->t.toString()).collect(Collectors.joining("\n * ")));
ourLog.info("Date indexes:\n * {}", myResourceIndexedSearchParamDateDao.findAll().stream().map(t -> t.toString()).collect(Collectors.joining("\n * ")));
addReadPartition(1);
myCaptureQueriesListener.clear();
SearchParameterMap map = new SearchParameterMap();
@ -2206,7 +2257,7 @@ public class PartitioningR4Test extends BaseJpaR4SystemTest implements ITestData
// Resolve forced IDs
sql = myCaptureQueriesListener.getSelectQueriesForCurrentThread().get(2).getSql(true, true);
assertEquals(sql, 1, countMatches(sql, "forcedid0_.RESOURCE_PID in"));
assertEquals(sql,0, countMatches(sql, "PARTITION_ID is null"));
assertEquals(sql, 0, countMatches(sql, "PARTITION_ID is null"));
}
@Test
@ -2322,20 +2373,40 @@ public class PartitioningR4Test extends BaseJpaR4SystemTest implements ITestData
}
@Interceptor
public static class MyInterceptor {
public static class MyReadWriteInterceptor extends MyWriteInterceptor {
private final List<RequestPartitionId> myCreateRequestPartitionIds = new ArrayList<>();
private final List<RequestPartitionId> myReadRequestPartitionIds = new ArrayList<>();
public void addCreatePartition(RequestPartitionId theRequestPartitionId) {
myCreateRequestPartitionIds.add(theRequestPartitionId);
}
public void addReadPartition(RequestPartitionId theRequestPartitionId) {
myReadRequestPartitionIds.add(theRequestPartitionId);
}
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ)
public RequestPartitionId PartitionIdentifyRead(ServletRequestDetails theRequestDetails) {
RequestPartitionId retVal = myReadRequestPartitionIds.remove(0);
ourLog.info("Returning partition for read: {}", retVal);
return retVal;
}
@Override
public void assertNoRemainingIds() {
super.assertNoRemainingIds();
assertEquals(0, myReadRequestPartitionIds.size());
}
}
@Interceptor
public static class MyWriteInterceptor {
private final List<RequestPartitionId> myCreateRequestPartitionIds = new ArrayList<>();
public void addCreatePartition(RequestPartitionId theRequestPartitionId) {
myCreateRequestPartitionIds.add(theRequestPartitionId);
}
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE)
public RequestPartitionId PartitionIdentifyCreate(IBaseResource theResource, ServletRequestDetails theRequestDetails) {
assertNotNull(theResource);
@ -2344,16 +2415,8 @@ public class PartitioningR4Test extends BaseJpaR4SystemTest implements ITestData
return retVal;
}
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ)
public RequestPartitionId PartitionIdentifyRead(ServletRequestDetails theRequestDetails) {
RequestPartitionId retVal = myReadRequestPartitionIds.remove(0);
ourLog.info("Returning partition for read: {}", retVal);
return retVal;
}
public void assertNoRemainingIds() {
assertEquals(0, myCreateRequestPartitionIds.size());
assertEquals(0, myReadRequestPartitionIds.size());
}
}

View File

@ -115,7 +115,7 @@ public class WebsocketWithSubscriptionIdR4Test extends BaseResourceProviderR4Tes
ClientUpgradeRequest request = new ClientUpgradeRequest();
ourLog.info("Connecting to : {}", echoUri);
Future<Session> connection = myWebSocketClient.connect(mySocketImplementation, echoUri, request);
Session session = connection.get(2, TimeUnit.SECONDS);
Session session = connection.get(10, TimeUnit.SECONDS);
ourLog.info("Connected to WS: {}", session.isOpen());