pointcut exceptions (#4822)

* enforce no exceptions on the invoking thread in PointCutLatch

* comment

* Msg.code

* Catch exceptions outside of await block

* fix migration issue

* Refactor PointcutLatch and add tests

* fix tests

* fix tests

* fix tests

* fix test

* fix test

* fix test

* fix test

* fix test

* fix test

* fix test

* fix test

* fix test

* fix test

* fix test!

* fix test!

* fix test

* fix test

* clean up latch error formatting

* fix test

* fix test

* fix test

* fix test

* fix test

* fix tests

* fix test

* fix test

* Remove hapi-fhir-jpaserver-uhnfhirtest from build as it is crashing CI

* fix test

* fix intermittent

* A SNAPSHOT dependency on an upstream module was added to HAPI FHIR, which changed and caused some downstream tests to break.

* fix intermittent

* Add Maven enforcer plugin for CR dependencies

* Make maven enforcer conditional on CI builds

* Remove hapi-fhir-jpaserver-uhnfhirtest from build as it is crashing CI

* improve test logging

* pre-review cleanup

* review feedback

* remove hapi-fhir-base-test-jaxrsserver-kotlin from the build

---------

Co-authored-by: Ken Stevens <ken@smilecdr.com>
Co-authored-by: Jonathan Percival <jonathan.i.percival@gmail.com>
This commit is contained in:
Ken Stevens 2023-05-11 22:31:38 -04:00 committed by GitHub
parent 0475cb682f
commit 4313dc9958
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 631 additions and 276 deletions

View File

@ -129,8 +129,8 @@ stages:
module: hapi-tinder-plugin
- name: hapi_tinder_test
module: hapi-tinder-test
- name: tests_hapi_fhir_base_test_jaxrsserver_kotlin
module: tests/hapi-fhir-base-test-jaxrsserver-kotlin
# - name: tests_hapi_fhir_base_test_jaxrsserver_kotlin
# module: tests/hapi-fhir-base-test-jaxrsserver-kotlin
- name: tests_hapi_fhir_base_test_mindeps_client
module: tests/hapi-fhir-base-test-mindeps-client
- name: tests_hapi_fhir_base_test_mindeps_server

View File

@ -23,8 +23,6 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import javax.annotation.Nonnull;
import java.util.Collection;
@ -136,8 +134,14 @@ public class HookParams {
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SIMPLE_STYLE)
.append("params", myParams)
.toString();
StringBuilder b = new StringBuilder();
myParams.forEach((key, value) -> {
b.append(" ")
.append(key.getSimpleName())
.append(": ")
.append(value)
.append("\n");
});
return b.toString();
}
}

View File

@ -2,8 +2,8 @@ package ca.uhn.fhir.cli;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.interceptor.CapturingInterceptor;
import ca.uhn.fhir.test.utilities.TlsAuthenticationTestHelper;
import ca.uhn.fhir.test.utilities.RestServerR4Helper;
import ca.uhn.fhir.test.utilities.TlsAuthenticationTestHelper;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.ParseException;
@ -29,7 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class ExampleDataUploaderTest {
@RegisterExtension
public final RestServerR4Helper myRestServerR4Helper = new RestServerR4Helper();
public final RestServerR4Helper myRestServerR4Helper = RestServerR4Helper.newWithTransactionLatch();
@RegisterExtension
public TlsAuthenticationTestHelper myTlsAuthenticationTestHelper = new TlsAuthenticationTestHelper();
@ -46,7 +46,8 @@ class ExampleDataUploaderTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHeaderPassthrough(boolean theIncludeTls) throws ParseException {
public void testHeaderPassthrough(boolean theIncludeTls) throws ParseException, InterruptedException {
// setup
String headerKey = "test-header-key";
String headerValue = "test header value";
@ -60,8 +61,11 @@ class ExampleDataUploaderTest {
);
final CommandLine commandLine = new DefaultParser().parse(testedCommand.getOptions(), args, true);
testedCommand.run(commandLine);
// execute
myRestServerR4Helper.executeWithLatch(() -> runCommand(commandLine));
// validate
assertNotNull(myCapturingInterceptor.getLastRequest());
Map<String, List<String>> allHeaders = myCapturingInterceptor.getLastRequest().getAllHeaders();
assertFalse(allHeaders.isEmpty());
@ -78,6 +82,14 @@ class ExampleDataUploaderTest {
assertEquals("EX3152", resource.getIdElement().getIdPart());
}
private void runCommand(CommandLine commandLine) {
try {
testedCommand.run(commandLine);
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
private static class RequestCapturingExampleDataUploader extends ExampleDataUploader {
private final CapturingInterceptor myCapturingInterceptor;

View File

@ -43,7 +43,7 @@ public class ExportConceptMapToCsvCommandDstu3Test {
}
@RegisterExtension
public final RestServerDstu3Helper myRestServerDstu3Helper = new RestServerDstu3Helper(true);
public final RestServerDstu3Helper myRestServerDstu3Helper = RestServerDstu3Helper.newInitialized();
@RegisterExtension
public TlsAuthenticationTestHelper myTlsAuthenticationTestHelper = new TlsAuthenticationTestHelper();

View File

@ -40,7 +40,7 @@ public class ExportConceptMapToCsvCommandR4Test {
}
@RegisterExtension
public final RestServerR4Helper myRestServerR4Helper = new RestServerR4Helper(true);
public final RestServerR4Helper myRestServerR4Helper = RestServerR4Helper.newInitialized();
@RegisterExtension
public TlsAuthenticationTestHelper myTlsAuthenticationTestHelper = new TlsAuthenticationTestHelper();

View File

@ -45,7 +45,7 @@ public class ImportCsvToConceptMapCommandDstu3Test {
}
@RegisterExtension
public final RestServerDstu3Helper myRestServerDstu3Helper = new RestServerDstu3Helper(true);
public final RestServerDstu3Helper myRestServerDstu3Helper = RestServerDstu3Helper.newInitialized();
@RegisterExtension
public TlsAuthenticationTestHelper myTlsAuthenticationTestHelper = new TlsAuthenticationTestHelper();

View File

@ -50,7 +50,7 @@ public class ImportCsvToConceptMapCommandR4Test {
@RegisterExtension
public final RestServerR4Helper myRestServerR4Helper = new RestServerR4Helper(true);
public final RestServerR4Helper myRestServerR4Helper = RestServerR4Helper.newInitialized();
@RegisterExtension
public TlsAuthenticationTestHelper myTlsAuthenticationTestHelper = new TlsAuthenticationTestHelper();

View File

@ -39,7 +39,7 @@ class ReindexTerminologyCommandTest {
private BaseJpaSystemProvider<?, ?> myProvider = spy(new BaseJpaSystemProvider<>() {});
@RegisterExtension
public final RestServerR4Helper myRestServerR4Helper = new RestServerR4Helper(true);
public final RestServerR4Helper myRestServerR4Helper = RestServerR4Helper.newInitialized();
@RegisterExtension
public TlsAuthenticationTestHelper myTlsAuthenticationTestHelper = new TlsAuthenticationTestHelper();

View File

@ -104,9 +104,9 @@ public class UploadTerminologyCommandTest {
}
@RegisterExtension
public final RestServerR4Helper myRestServerR4Helper = new RestServerR4Helper(true);
public final RestServerR4Helper myRestServerR4Helper = RestServerR4Helper.newInitialized();
@RegisterExtension
public final RestServerDstu3Helper myRestServerDstu3Helper = new RestServerDstu3Helper(true);
public final RestServerDstu3Helper myRestServerDstu3Helper = RestServerDstu3Helper.newInitialized();
@RegisterExtension
public TlsAuthenticationTestHelper myTlsAuthenticationTestHelper = new TlsAuthenticationTestHelper();

View File

@ -166,7 +166,6 @@ import java.util.stream.Collectors;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static org.apache.commons.lang3.BooleanUtils.isFalse;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.left;

View File

@ -7,7 +7,6 @@ import org.hl7.fhir.instance.model.api.IBaseBooleanDatatype;
import org.hl7.fhir.instance.model.api.IBaseCoding;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

View File

@ -20,6 +20,7 @@ import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.http.HttpServletRequest;
import java.util.function.Supplier;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.when;
@ -108,4 +109,19 @@ public abstract class BaseMdmHelper implements BeforeEachCallback, AfterEachCall
public PointcutLatch getAfterMdmLatch() {
return myAfterMdmLatch;
}
/**
* Expect 1 call to the MDM_AFTER_PERSISTED_RESOURCE_CHECKED pointcut when calling theSupplier. Wait until
* the mdm message arrives and this pointcut is called before returning the result of theSupplier.
* @param theSupplier
* @return
* @param <T>
* @throws InterruptedException
*/
public <T> T executeWithLatch(Supplier<T> theSupplier) throws InterruptedException {
myAfterMdmLatch.setExpectedCount(1);
T retval = theSupplier.get();
myAfterMdmLatch.awaitExpected();
return retval;
}
}

View File

@ -1,15 +1,20 @@
package ca.uhn.fhir.jpa.mdm.helper;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.mdm.api.MdmLinkEvent;
import ca.uhn.fhir.rest.server.TransactionLogMessages;
import ca.uhn.fhir.rest.server.messaging.ResourceOperationMessage;
import ca.uhn.test.concurrency.PointcutLatch;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Patient;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.util.List;
import static ca.uhn.fhir.mdm.api.MdmConstants.CODE_GOLDEN_RECORD;
import static ca.uhn.fhir.mdm.api.MdmConstants.SYSTEM_GOLDEN_RECORD_STATUS;
@ -27,8 +32,8 @@ public class MdmHelperR4 extends BaseMdmHelper {
public OutcomeAndLogMessageWrapper createWithLatch(IBaseResource theBaseResource, boolean isExternalHttpRequest) throws InterruptedException {
myAfterMdmLatch.setExpectedCount(1);
DaoMethodOutcome daoMethodOutcome = doCreateResource(theBaseResource, isExternalHttpRequest);
myAfterMdmLatch.awaitExpected();
return new OutcomeAndLogMessageWrapper(daoMethodOutcome, myAfterMdmLatch.getLatchInvocationParameterOfType(TransactionLogMessages.class));
List<HookParams> hookParams = myAfterMdmLatch.awaitExpected();
return new OutcomeAndLogMessageWrapper(daoMethodOutcome, hookParams);
}
public OutcomeAndLogMessageWrapper updateWithLatch(IBaseResource theIBaseResource) throws InterruptedException {
@ -38,8 +43,8 @@ public class MdmHelperR4 extends BaseMdmHelper {
public OutcomeAndLogMessageWrapper updateWithLatch(IBaseResource theIBaseResource, boolean isExternalHttpRequest) throws InterruptedException {
myAfterMdmLatch.setExpectedCount(1);
DaoMethodOutcome daoMethodOutcome = doUpdateResource(theIBaseResource, isExternalHttpRequest);
myAfterMdmLatch.awaitExpected();
return new OutcomeAndLogMessageWrapper(daoMethodOutcome, myAfterMdmLatch.getLatchInvocationParameterOfType(TransactionLogMessages.class));
List<HookParams> hookParams = myAfterMdmLatch.awaitExpected();
return new OutcomeAndLogMessageWrapper(daoMethodOutcome, hookParams);
}
public DaoMethodOutcome doCreateResource(IBaseResource theResource, boolean isExternalHttpRequest) {
@ -68,12 +73,12 @@ public class MdmHelperR4 extends BaseMdmHelper {
* by the MDM module.
*/
public class OutcomeAndLogMessageWrapper {
DaoMethodOutcome myDaoMethodOutcome;
TransactionLogMessages myLogMessages;
private final DaoMethodOutcome myDaoMethodOutcome;
private final List<HookParams> myHookParams;
private OutcomeAndLogMessageWrapper(DaoMethodOutcome theDaoMethodOutcome, TransactionLogMessages theTransactionLogMessages) {
public OutcomeAndLogMessageWrapper(DaoMethodOutcome theDaoMethodOutcome, List<HookParams> theHookParams) {
myDaoMethodOutcome = theDaoMethodOutcome;
myLogMessages = theTransactionLogMessages;
myHookParams = theHookParams;
}
public DaoMethodOutcome getDaoMethodOutcome() {
@ -81,7 +86,19 @@ public class MdmHelperR4 extends BaseMdmHelper {
}
public TransactionLogMessages getLogMessages() {
return myLogMessages;
return PointcutLatch.getInvocationParameterOfType(myHookParams, TransactionLogMessages.class);
}
public List<HookParams> getHookParams() {
return myHookParams;
}
public MdmLinkEvent getMdmLinkEvent() {
return PointcutLatch.getInvocationParameterOfType(myHookParams, MdmLinkEvent.class);
}
public ResourceOperationMessage getResourceOperationMessage() {
return PointcutLatch.getInvocationParameterOfType(myHookParams, ResourceOperationMessage.class);
}
}

View File

@ -58,9 +58,9 @@ public class MdmEventIT extends BaseMdmR4Test {
addExternalEID(patient2, "eid-11");
addExternalEID(patient2, "eid-22");
myMdmHelper.updateWithLatch(patient2);
MdmHelperR4.OutcomeAndLogMessageWrapper outcome = myMdmHelper.updateWithLatch(patient2);
MdmLinkEvent linkChangeEvent = myMdmHelper.getAfterMdmLatch().getLatchInvocationParameterOfType(MdmLinkEvent.class);
MdmLinkEvent linkChangeEvent = outcome.getMdmLinkEvent();
assertNotNull(linkChangeEvent);
ourLog.info("Got event: {}", linkChangeEvent);
@ -84,15 +84,15 @@ public class MdmEventIT extends BaseMdmR4Test {
@Test
public void testCreateLinkChangeEvent() throws InterruptedException {
Practitioner pr = buildPractitionerWithNameAndId("Young", "AC-DC");
myMdmHelper.createWithLatch(pr);
MdmHelperR4.OutcomeAndLogMessageWrapper outcome = myMdmHelper.createWithLatch(pr);
ResourceOperationMessage resourceOperationMessage = myMdmHelper.getAfterMdmLatch().getLatchInvocationParameterOfType(ResourceOperationMessage.class);
ResourceOperationMessage resourceOperationMessage = outcome.getResourceOperationMessage();
assertNotNull(resourceOperationMessage);
assertEquals(pr.getIdElement().toUnqualifiedVersionless().getValue(), resourceOperationMessage.getId());
MdmLink link = getLinkByTargetId(pr);
MdmLinkEvent linkChangeEvent = myMdmHelper.getAfterMdmLatch().getLatchInvocationParameterOfType(MdmLinkEvent.class);
MdmLinkEvent linkChangeEvent = outcome.getMdmLinkEvent();
assertNotNull(linkChangeEvent);
assertEquals(1, linkChangeEvent.getMdmLinks().size());
@ -110,9 +110,9 @@ public class MdmEventIT extends BaseMdmR4Test {
@Test
public void testUpdateLinkChangeEvent() throws InterruptedException {
Patient patient1 = addExternalEID(buildJanePatient(), "eid-1");
myMdmHelper.createWithLatch(patient1);
MdmHelperR4.OutcomeAndLogMessageWrapper outcome = myMdmHelper.createWithLatch(patient1);
MdmLinkEvent linkChangeEvent = myMdmHelper.getAfterMdmLatch().getLatchInvocationParameterOfType(MdmLinkEvent.class);
MdmLinkEvent linkChangeEvent = outcome.getMdmLinkEvent();
assertNotNull(linkChangeEvent);
assertEquals(1, linkChangeEvent.getMdmLinks().size());

View File

@ -257,12 +257,11 @@ public class MdmSearchExpandingInterceptorIT extends BaseMdmR4Test {
}
@Test
public void testReferenceExpansionQuietlyFailsOnMissingMdmMatches() {
public void testReferenceExpansionQuietlyFailsOnMissingMdmMatches() throws InterruptedException {
myStorageSettings.setAllowMdmExpansion(true);
Patient patient = buildJanePatient();
patient.getMeta().addTag(MdmConstants.SYSTEM_MDM_MANAGED, MdmConstants.CODE_NO_MDM_MANAGED, "Don't MDM on me!");
DaoMethodOutcome daoMethodOutcome = myMdmHelper.doCreateResource(patient, true);
String id = daoMethodOutcome.getId().getIdPart();
String id = myMdmHelper.executeWithLatch(() -> myMdmHelper.doCreateResource(patient, true)).getId().getIdPart();
createObservationWithSubject(id);
//Even though the user has NO mdm links, that should not cause a request failure.

View File

@ -32,7 +32,7 @@ public abstract class BaseProviderR4Test extends BaseMdmR4Test {
@Autowired
private IMdmSubmitSvc myMdmSubmitSvc;
@Autowired
private MdmSettings myMdmSettings;
protected MdmSettings myMdmSettings;
@Autowired
private MdmControllerHelper myMdmHelper;
@Autowired

View File

@ -1,7 +1,7 @@
package ca.uhn.fhir.jpa.mdm.provider;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.MdmLink;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.mdm.api.MdmConstants;
@ -73,6 +73,7 @@ public class MdmProviderCreateLinkR4Test extends BaseLinkR4Test {
@Test
public void testCreateLinkWithMatchResultOnDifferentPartitions() {
myPartitionSettings.setPartitioningEnabled(true);
myMdmSettings.setSearchAllPartitionForMatch(false);
myPartitionLookupSvc.createPartition(new PartitionEntity().setId(1).setName(PARTITION_1), null);
myPartitionLookupSvc.createPartition(new PartitionEntity().setId(2).setName(PARTITION_2), null);
assertLinkCount(1);

View File

@ -148,6 +148,7 @@ public class MdmProviderMergeGoldenResourcesR4Test extends BaseProviderR4Test {
@Test
public void testMergeOnDifferentPartitions() {
myPartitionSettings.setPartitioningEnabled(true);
myMdmSettings.setSearchAllPartitionForMatch(false);
myPartitionLookupSvc.createPartition(new PartitionEntity().setId(1).setName(PARTITION_1), null);
RequestPartitionId requestPartitionId1 = RequestPartitionId.fromPartitionId(1);
myPartitionLookupSvc.createPartition(new PartitionEntity().setId(2).setName(PARTITION_2), null);

View File

@ -20,6 +20,7 @@
package ca.uhn.fhir.jpa.subscription.channel.subscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -60,6 +61,7 @@ class SubscriptionChannelCache {
return myCache.containsKey(theChannelName);
}
@VisibleForTesting
void logForUnitTest() {
for (String key : myCache.keySet()) {
ourLog.info("SubscriptionChannelCache: {}", key);

View File

@ -28,6 +28,7 @@ import ca.uhn.fhir.jpa.subscription.channel.models.ReceivingChannelParameters;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import org.slf4j.Logger;
@ -147,4 +148,9 @@ public class SubscriptionChannelRegistry {
public synchronized int size() {
return myDeliveryReceiverChannels.size();
}
@VisibleForTesting
public void logForUnitTest() {
myDeliveryReceiverChannels.logForUnitTest();
}
}

View File

@ -128,6 +128,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
mySubscriptionMatchingPost.clear();
mySubscriptionActivatedPost.clear();
ourObservationListener.clear();
mySubscriptionResourceMatched.clear();
mySubscriptionResourceNotMatched.clear();
super.clearRegistry();
}
@ -145,9 +147,11 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
}
protected Subscription sendSubscription(Subscription theSubscription, RequestPartitionId theRequestPartitionId, Boolean mockDao) throws InterruptedException {
mySubscriptionResourceNotMatched.setExpectedCount(1);
mySubscriptionActivatedPost.setExpectedCount(1);
Subscription retVal = sendResource(theSubscription, theRequestPartitionId);
mySubscriptionActivatedPost.awaitExpected();
mySubscriptionResourceNotMatched.awaitExpected();
return retVal;
}

View File

@ -35,7 +35,9 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
assertEquals(2, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
assertEquals(1, ourContentTypes.size());
@ -58,7 +60,9 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
assertEquals(2, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
assertEquals(1, ourContentTypes.size());
@ -82,7 +86,9 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
mySubscriptionAfterDelivery.setExpectedCount(1);
ourObservationListener.setExpectedCount(0);
mySubscriptionResourceMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.clear();
mySubscriptionAfterDelivery.awaitExpected();
@ -120,7 +126,9 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
observation.setStatus(Observation.ObservationStatus.FINAL);
mySubscriptionResourceMatched.setExpectedCount(1);
sendResource(observation);
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
assertEquals(1, ourContentTypes.size());

View File

@ -76,7 +76,9 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
assertEquals(2, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
assertEquals(1, ourContentTypes.size());
@ -99,7 +101,9 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
assertEquals(2, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
assertEquals(1, ourContentTypes.size());
@ -117,7 +121,9 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
assertEquals(1, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
sendResource(observation);
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
assertEquals(1, ourContentTypes.size());
@ -141,7 +147,9 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mySubscriptionAfterDelivery.setExpectedCount(1);
ourObservationListener.setExpectedCount(0);
mySubscriptionResourceMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.clear();
mySubscriptionAfterDelivery.awaitExpected();
@ -168,7 +176,9 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
assertEquals(3, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(2);
mySubscriptionResourceMatched.setExpectedCount(2);
sendObservation(code, "SNOMED-CT");
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
assertEquals(2, ourContentTypes.size());

View File

@ -86,7 +86,10 @@ public class ExpungeHookTest extends BaseJpaDstu3Test {
options.setExpungeEverything(true);
options.setExpungeDeletedResources(true);
options.setExpungeOldVersions(true);
// TODO KHS shouldn't this be 1? Investigate why is it 2?
myExpungeResourceLatch.setExpectedCount(2);
myPatientDao.expunge(id.toUnqualifiedVersionless(), options, mySrd);
myExpungeResourceLatch.awaitExpected();
assertPatientGone(id);
// Create it a second time.

View File

@ -229,7 +229,9 @@ public class ResourceChangeListenerRegistryImplIT extends BaseJpaR4Test {
assertEquals(1, myResourceChangeListenerRegistry.getResourceVersionCacheSizeForUnitTest());
otherTestCallback.setInitExpectedCount(1);
otherCache.forceRefresh();
otherTestCallback.awaitInitExpected();
assertEquals(2, myResourceChangeListenerRegistry.getResourceVersionCacheSizeForUnitTest());
myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(myMaleTestCallback);

View File

@ -42,24 +42,21 @@ public class TransactionHookTest extends BaseJpaR4SystemTest {
myStorageSettings.setEnforceReferentialIntegrityOnDelete(true);
}
PointcutLatch myPointcutLatch = new PointcutLatch(Pointcut.STORAGE_TRANSACTION_PROCESSED);
PointcutLatch myTransactionProcessedLatch = new PointcutLatch(Pointcut.STORAGE_TRANSACTION_PROCESSED);
@Autowired
private IInterceptorService myInterceptorService;
@BeforeEach
public void beforeEach() {
myInterceptorService.registerAnonymousInterceptor(Pointcut.STORAGE_TRANSACTION_PROCESSED, myPointcutLatch);
myInterceptorService.registerAnonymousInterceptor(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED, myPointcutLatch);
myInterceptorService.registerAnonymousInterceptor(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED, myPointcutLatch);
myInterceptorService.registerAnonymousInterceptor(Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED, myPointcutLatch);
myInterceptorService.registerAnonymousInterceptor(Pointcut.STORAGE_TRANSACTION_PROCESSED, myTransactionProcessedLatch);
}
@AfterEach
@Override
public void afterResetInterceptors() {
super.afterResetInterceptors();
myInterceptorService.unregisterInterceptor(myPointcutLatch);
myInterceptorService.unregisterInterceptor(myTransactionProcessedLatch);
}
@Test
@ -92,12 +89,9 @@ public class TransactionHookTest extends BaseJpaR4SystemTest {
//Delete an observation
b.addEntry().getRequest().setMethod(DELETE).setUrl(daoMethodOutcome.getId().toUnqualifiedVersionless().getValue());
List<HookParams> hookParams = callTransaction(b);
myPointcutLatch.setExpectedCount(4);
mySystemDao.transaction(mySrd, b);
List<HookParams> hookParams = myPointcutLatch.awaitExpected();
DeferredInterceptorBroadcasts broadcastsParam = hookParams.get(3).get(DeferredInterceptorBroadcasts.class);
DeferredInterceptorBroadcasts broadcastsParam = hookParams.get(0).get(DeferredInterceptorBroadcasts.class);
ListMultimap<Pointcut, HookParams> deferredInterceptorBroadcasts = broadcastsParam.getDeferredInterceptorBroadcasts();
assertThat(deferredInterceptorBroadcasts.entries(), hasSize(3));
@ -118,7 +112,7 @@ public class TransactionHookTest extends BaseJpaR4SystemTest {
}
@Test
public void testDeleteInTransactionShouldSucceedWhenReferencesAreAlsoRemoved() {
public void testDeleteInTransactionShouldSucceedWhenReferencesAreAlsoRemoved() throws InterruptedException {
final Observation obs1 = new Observation();
obs1.setStatus(Observation.ObservationStatus.FINAL);
IIdType obs1id = myObservationDao.create(obs1).getId().toUnqualifiedVersionless();
@ -141,7 +135,7 @@ public class TransactionHookTest extends BaseJpaR4SystemTest {
try {
// transaction should succeed because the DiagnosticReport which references obs2 is also deleted
mySystemDao.transaction(mySrd, b);
callTransaction(b);
} catch (ResourceVersionConflictException e) {
fail();
}
@ -149,7 +143,7 @@ public class TransactionHookTest extends BaseJpaR4SystemTest {
@Test
public void testDeleteWithHas_SourceModifiedToNoLongerIncludeReference() {
public void testDeleteWithHas_SourceModifiedToNoLongerIncludeReference() throws InterruptedException {
Observation obs1 = new Observation();
obs1.setStatus(Observation.ObservationStatus.FINAL);
@ -173,7 +167,7 @@ public class TransactionHookTest extends BaseJpaR4SystemTest {
Bundle b = new Bundle();
b.addEntry().getRequest().setMethod(DELETE).setUrl("Observation?_has:DiagnosticReport:result:identifier=foo|IDENTIFIER");
b.addEntry().setResource(rpt).getRequest().setMethod(PUT).setUrl("DiagnosticReport?identifier=foo|IDENTIFIER");
mySystemDao.transaction(mySrd, b);
callTransaction(b);
myObservationDao.read(obs1id);
try {
@ -187,8 +181,14 @@ public class TransactionHookTest extends BaseJpaR4SystemTest {
assertThat(rpt.getResult(), empty());
}
private List<HookParams> callTransaction(Bundle b) throws InterruptedException {
myTransactionProcessedLatch.setExpectedCount(1);
mySystemDao.transaction(mySrd, b);
return myTransactionProcessedLatch.awaitExpected();
}
@Test
public void testDeleteWithId_SourceModifiedToNoLongerIncludeReference() {
public void testDeleteWithId_SourceModifiedToNoLongerIncludeReference() throws InterruptedException {
Observation obs1 = new Observation();
obs1.setStatus(Observation.ObservationStatus.FINAL);
@ -211,7 +211,7 @@ public class TransactionHookTest extends BaseJpaR4SystemTest {
Bundle b = new Bundle();
b.addEntry().getRequest().setMethod(DELETE).setUrl(obs1id.getValue());
b.addEntry().setResource(rpt).getRequest().setMethod(PUT).setUrl(rptId.getValue());
mySystemDao.transaction(mySrd, b);
callTransaction(b);
myObservationDao.read(obs2id);
myDiagnosticReportDao.read(rptId);
@ -226,7 +226,7 @@ public class TransactionHookTest extends BaseJpaR4SystemTest {
@Test
public void testDeleteWithHas_SourceModifiedToStillIncludeReference() {
public void testDeleteWithHas_SourceModifiedToStillIncludeReference() throws InterruptedException {
Observation obs1 = new Observation();
obs1.setStatus(Observation.ObservationStatus.FINAL);
@ -252,7 +252,7 @@ public class TransactionHookTest extends BaseJpaR4SystemTest {
b.addEntry().getRequest().setMethod(DELETE).setUrl("Observation?_has:DiagnosticReport:result:identifier=foo|IDENTIFIER");
b.addEntry().setResource(rpt).getRequest().setMethod(PUT).setUrl("DiagnosticReport?identifier=foo|IDENTIFIER");
try {
mySystemDao.transaction(mySrd, b);
callTransaction(b);
fail();
} catch (ResourceVersionConflictException e ) {
assertThat(e.getMessage(), matchesPattern(Msg.code(550) + Msg.code(515) + "Unable to delete Observation/[0-9]+ because at least one resource has a reference to this resource. First reference found was resource DiagnosticReport/[0-9]+ in path DiagnosticReport.result"));

View File

@ -19,13 +19,12 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.HumanName;
import org.hl7.fhir.r4.model.Patient;
import javax.annotation.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import javax.annotation.Nullable;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -225,8 +224,8 @@ public class ThreadSafeResourceDeleterSvcTest extends BaseJpaR4Test {
public void cascadeDelete(RequestDetails theRequestDetails, DeleteConflictList theConflictList, IBaseResource theResource) throws InterruptedException {
myCalledLatch.call(theResource);
ourLog.info("Waiting to proceed with delete");
myWaitLatch.awaitExpected();
ourLog.info("Cascade Delete proceeding: {}", myWaitLatch.getLatchInvocationParameter());
List<HookParams> hookParams = myWaitLatch.awaitExpected();
ourLog.info("Cascade Delete proceeding: {}", PointcutLatch.getLatchInvocationParameter(hookParams));
myWaitLatch.setExpectedCount(1);
}

View File

@ -41,6 +41,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@ -214,10 +215,10 @@ public class BinaryAccessProviderR4Test extends BaseResourceProviderR4Test {
latch.setExpectedCount(1);
try (CloseableHttpResponse resp = ourHttpClient.execute(get)) {
assertEquals(200, resp.getStatusLine().getStatusCode());
latch.awaitExpected();
List<HookParams> hookParams = latch.awaitExpected();
RequestDetails requestDetails = latch.getLatchInvocationParameterOfType(RequestDetails.class);
ResponseDetails responseDetails= latch.getLatchInvocationParameterOfType(ResponseDetails.class);
RequestDetails requestDetails = PointcutLatch.getInvocationParameterOfType(hookParams, RequestDetails.class);
ResponseDetails responseDetails= PointcutLatch.getInvocationParameterOfType(hookParams, ResponseDetails.class);
assertThat(responseDetails, is(notNullValue()));
assertThat(requestDetails, is(notNullValue()));

View File

@ -437,15 +437,15 @@ public abstract class BaseJpaR5Test extends BaseJpaTest implements ITestDataBuil
@Override
protected void afterResetInterceptors() {
super.afterResetInterceptors();
myInterceptorRegistry.unregisterInterceptor(myPerformanceTracingLoggingInterceptor);
// myInterceptorRegistry.unregisterInterceptor(myPerformanceTracingLoggingInterceptor);
}
@BeforeEach
public void beforeCreateInterceptor() {
myInterceptor = mock(IServerInterceptor.class);
myPerformanceTracingLoggingInterceptor = new PerformanceTracingLoggingInterceptor();
myInterceptorRegistry.registerInterceptor(myPerformanceTracingLoggingInterceptor);
// myPerformanceTracingLoggingInterceptor = new PerformanceTracingLoggingInterceptor();
// myInterceptorRegistry.registerInterceptor(myPerformanceTracingLoggingInterceptor);
}
@BeforeEach

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
@ -202,11 +203,15 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
ourCountHolder = myCountHolder;
}
// WIP STR5 consolidate with lambda
protected IIdType createResource(IBaseResource theResource, boolean theExpectDelivery) throws InterruptedException {
return createResource(theResource, theExpectDelivery, 1);
}
// WIP STR5 consolidate with lambda
protected IIdType createResource(IBaseResource theResource, boolean theExpectDelivery, int theCount) throws InterruptedException {
IFhirResourceDao dao = myDaoRegistry.getResourceDao(theResource.getClass());
if (theExpectDelivery) {
mySubscriptionDeliveredLatch.setExpectedCount(1);
mySubscriptionDeliveredLatch.setExpectedCount(theCount);
}
mySubscriptionTopicsCheckedLatch.setExpectedCount(1);
IIdType id = dao.create(theResource, mySrd).getId();
@ -225,8 +230,8 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
mySubscriptionTopicsCheckedLatch.setExpectedCount(1);
DaoMethodOutcome retval = dao.update(theResource, mySrd);
mySubscriptionTopicsCheckedLatch.awaitExpected();
ResourceModifiedMessage lastMessage = mySubscriptionTopicsCheckedLatch.getLatchInvocationParameterOfType(ResourceModifiedMessage.class);
List<HookParams> hookParams = mySubscriptionTopicsCheckedLatch.awaitExpected();
ResourceModifiedMessage lastMessage = PointcutLatch.getInvocationParameterOfType(hookParams, ResourceModifiedMessage.class);
assertEquals(theResource.getIdElement().toVersionless().toString(), lastMessage.getPayloadId());
if (theExpectDelivery) {

View File

@ -16,7 +16,6 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -30,6 +29,7 @@ public class SubscriptionTopicR5Test extends BaseSubscriptionsR5Test {
@Test
public void testRestHookSubscriptionTopicApplicationFhirJson() throws Exception {
//setup
// WIP SR4B test update, delete, etc
createEncounterSubscriptionTopic(Enumerations.EncounterStatus.PLANNED, Enumerations.EncounterStatus.COMPLETED, SubscriptionTopic.InteractionTrigger.CREATE);
waitForRegisteredSubscriptionTopicCount(1);
@ -38,10 +38,11 @@ public class SubscriptionTopicR5Test extends BaseSubscriptionsR5Test {
waitForActivatedSubscriptionCount(1);
assertEquals(0, getSystemProviderCount());
Encounter sentEncounter = sendEncounterWithStatus(Enumerations.EncounterStatus.COMPLETED, false);
await().until(() -> getSystemProviderCount() > 0);
// execute
Encounter sentEncounter = sendEncounterWithStatus(Enumerations.EncounterStatus.COMPLETED, true);
// verify
Bundle receivedBundle = getLastSystemProviderBundle();
List<IBaseResource> resources = BundleUtil.toListOfResources(myFhirCtx, receivedBundle);
assertEquals(2, resources.size());

View File

@ -77,6 +77,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
createObservationSubscriptionTopic(OBS_CODE2);
waitForRegisteredSubscriptionTopicCount(2);
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription1 = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE, Constants.CT_FHIR_XML_NEW);
Subscription subscription = postSubscription(subscription1);
@ -92,9 +93,14 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
assertEquals(sentObservation.getIdElement(), obs.getIdElement());
}
@NotNull
private Observation sendObservationExpectDelivery(int theCount) throws InterruptedException {
return sendObservation(OBS_CODE, "SNOMED-CT", true, theCount);
}
@NotNull
private Observation sendObservationExpectDelivery() throws InterruptedException {
return sendObservation(true);
return sendObservation(OBS_CODE, "SNOMED-CT", true, 1);
}
@Test
@ -274,6 +280,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
@NotNull
private Subscription createTopicSubscription() throws InterruptedException {
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE, Constants.CT_FHIR_JSON_NEW);
return postSubscription(subscription);
@ -369,7 +376,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(countDownLatch);
ourLog.info("** About to send observation");
Observation sentObservation = sendObservation(false);
Observation sentObservation = sendObservation(OBS_CODE, "SNOMED-CT", false);
assertEquals("1", sentObservation.getIdElement().getVersionIdPart());
assertNull(sentObservation.getNoteFirstRep().getText());
@ -418,7 +425,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(countDownLatch);
ourLog.info("** About to send observation");
Observation sentObservation = sendObservation(false);
Observation sentObservation = sendObservation(OBS_CODE, "SNOMED-CT", false);
assertEquals("1", sentObservation.getIdElement().getVersionIdPart());
assertNull(sentObservation.getNoteFirstRep().getText());
@ -440,17 +447,20 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
@Test
public void testRestHookSubscriptionApplicationJson() throws Exception {
ourLog.info(">>>1 Creating topics");
createObservationSubscriptionTopic(OBS_CODE);
createObservationSubscriptionTopic(OBS_CODE2);
waitForRegisteredSubscriptionTopicCount(2);
// Subscribe to OBS_CODE topic
ourLog.info(">>>2 Creating subscriptions");
Subscription subscription1 = createTopicSubscription();
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE2, Constants.CT_FHIR_JSON_NEW);
// Subscribe to OBS_CODE2 topic
Subscription subscription2 = postSubscription(newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE2, Constants.CT_FHIR_JSON_NEW));
Subscription subscription2 = postSubscription(subscription);
waitForActivatedSubscriptionCount(2);
ourLog.info(">>>3 Send obs");
Observation sentObservation1 = sendObservationExpectDelivery();
awaitUntilReceivedTransactionCount(1);
Observation receivedObs = assertBundleAndGetObservation(subscription1, sentObservation1);
@ -462,25 +472,31 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
Subscription subscriptionTemp = myClient.read().resource(Subscription.class).withId(subscription2.getId()).execute();
assertNotNull(subscriptionTemp);
subscriptionTemp.setTopic(subscription1.getTopic());
ourLog.info(">>>4 Update sub");
updateResource(subscriptionTemp, false);
Observation observation2 = sendObservationExpectDelivery();
ourLog.info(">>>5 Send obs");
Observation observation2 = sendObservationExpectDelivery(2);
// Should see two subscription notifications since both now point to OBS_CODE2
awaitUntilReceivedTransactionCount(3);
// Delete the second subscription
ourLog.info(">>> Deleting {}", subscription2.getId());
ourLog.info(">>>6 Delete sub");
deleteSubscription(subscription2);
waitForActivatedSubscriptionCount(1);
ourLog.info(">>>7 Send obs");
IdType observationTemp3Id = sendObservationExpectDelivery().getIdElement().toUnqualifiedVersionless();
// Should see only one subscription notification
awaitUntilReceivedTransactionCount(4);
// Now update the observation to have OBS_CODE2
Observation observation3 = myClient.read().resource(Observation.class).withId(observationTemp3Id).execute();
setCode(observation3, OBS_CODE2);
CodeableConcept codeableConcept = new CodeableConcept();
observation3.setCode(codeableConcept);
Coding coding = codeableConcept.addCoding();
coding.setCode(OBS_CODE + "111");
coding.setSystem("SNOMED-CT");
ourLog.info(">>>8 Send obs");
updateResource(observation3, true);
// Should see one subscription notification even though the new version doesn't match, the old version still does and our subscription topic
@ -489,8 +505,12 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
Observation observation3a = myClient.read().resource(Observation.class).withId(observationTemp3Id).execute();
// Now update it back to OBS_CODE again
setCode(observation3a, OBS_CODE);
CodeableConcept codeableConcept1 = new CodeableConcept();
observation3a.setCode(codeableConcept1);
Coding coding1 = codeableConcept1.addCoding();
coding1.setCode(OBS_CODE);
coding1.setSystem("SNOMED-CT");
ourLog.info(">>>9 Send obs");
updateResource(observation3a, true);
// Should see exactly one subscription notification
@ -501,14 +521,6 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
assertFalse(observation2.getId().isEmpty());
}
private static void setCode(Observation observation3a, String obsCode) {
CodeableConcept codeableConcept1 = new CodeableConcept();
observation3a.setCode(codeableConcept1);
Coding coding1 = codeableConcept1.addCoding();
coding1.setCode(obsCode);
coding1.setSystem("SNOMED-CT");
}
private void deleteSubscription(Subscription subscription2) throws InterruptedException {
mySubscriptionTopicsCheckedLatch.setExpectedCount(1);
myClient.delete().resourceById(new IdType("Subscription/" + subscription2.getId())).execute();
@ -516,19 +528,12 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
}
private void awaitUntilReceivedTransactionCount(int theExpected) {
if (getSystemProviderCount() == theExpected) {
String list = getReceivedObservations().stream()
.map(t -> t.getIdElement().toUnqualifiedVersionless().getValue() + " " + t.getCode().getCodingFirstRep().getCode())
.collect(Collectors.joining(", "));
ourLog.info("Received {} transactions as expected: {}", theExpected, list);
} else {
String list = getReceivedObservations().stream()
.map(t -> t.getIdElement().toUnqualifiedVersionless().getValue() + " " + t.getCode().getCodingFirstRep().getCode())
.collect(Collectors.joining(", "));
String errorMessage = "Expected " + theExpected + " transactions, have " + getSystemProviderCount() + ": " + list;
await(errorMessage).until(() -> getSystemProviderCount() == theExpected);
}
}
@Test
public void testRestHookSubscriptionApplicationJsonDatabase() throws Exception {
@ -539,6 +544,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
@Nonnull
private Subscription createTopicSubscription(String theTopicUrlSuffix) throws InterruptedException {
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + theTopicUrlSuffix, Constants.CT_FHIR_JSON_NEW);
return postSubscription(subscription);
@ -548,6 +554,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
public void testSubscriptionTriggerViaSubscription() throws Exception {
createSubscriptionTopic();
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription1 = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE, Constants.CT_FHIR_XML_NEW);
Subscription subscription = postSubscription(subscription1);
@ -601,6 +608,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
ourLog.info("** About to create non-matching subscription");
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription1 = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE2, Constants.CT_FHIR_XML_NEW);
Subscription subscription = postSubscription(subscription1);
@ -608,11 +616,11 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
ourLog.info("** About to send observation that wont match");
sendObservation(false);
sendObservation(OBS_CODE, "SNOMED-CT", false);
awaitUntilReceivedTransactionCount(0);
ourLog.info("** About to update subscription topic");
SubscriptionTopic subscriptionTopicTemp = myClient.read().resource(SubscriptionTopic.class).withId(subscriptionTopic.getId()).execute();
SubscriptionTopic subscriptionTopicTemp = myClient.read(SubscriptionTopic.class, subscriptionTopic.getId());
assertNotNull(subscriptionTopicTemp);
setSubscriptionTopicCriteria(subscriptionTopicTemp, "Observation?code=SNOMED-CT|" + OBS_CODE);
updateResource(subscriptionTopicTemp, false);
@ -625,7 +633,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
deleteSubscription(subscription);
sendObservation(false);
sendObservation(OBS_CODE, "SNOMED-CT", false);
// No more matches
awaitUntilReceivedTransactionCount(1);
@ -641,9 +649,11 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
createObservationSubscriptionTopic(OBS_CODE2);
waitForRegisteredSubscriptionTopicCount(2);
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription3 = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE, Constants.CT_FHIR_XML_NEW);
postSubscription(subscription3);
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE2, Constants.CT_FHIR_XML_NEW);
postSubscription(subscription);
@ -675,10 +685,11 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
}
}
private void createSubscriptionTopicWithCriteria(String theCriteria) throws InterruptedException {
@Nonnull
private SubscriptionTopic createSubscriptionTopicWithCriteria(String theCriteria) throws InterruptedException {
SubscriptionTopic subscriptionTopic = buildSubscriptionTopic(CUSTOM_URL);
setSubscriptionTopicCriteria(subscriptionTopic, theCriteria);
createSubscriptionTopic(subscriptionTopic);
return createSubscriptionTopic(subscriptionTopic);
}
@Test
@ -699,7 +710,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
updateResource(subscription, false);
// Send another observation
sendObservation(false);
sendObservation(OBS_CODE, "SNOMED-CT", false);
// Should see no new delivery
awaitUntilReceivedTransactionCount(1);
@ -780,7 +791,9 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
sp.setType(Enumerations.SearchParamType.TOKEN);
sp.setExpression("Observation.extension('Observation#accessType')");
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
mySubscriptionTopicsCheckedLatch.setExpectedCount(1);
mySearchParameterDao.create(sp, mySrd);
mySubscriptionTopicsCheckedLatch.awaitExpected();
mySearchParamRegistry.forceRefresh();
createSubscriptionTopicWithCriteria(criteria);
waitForRegisteredSubscriptionTopicCount(1);
@ -846,20 +859,22 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
return retval;
}
private Observation sendObservation(String theCode, String theSystem, boolean theExpectDelivery) throws InterruptedException {
return sendObservation(theCode, theSystem, theExpectDelivery, 1);
}
private Observation sendObservation(boolean theExpectDelivery) throws
InterruptedException {
private Observation sendObservation(String theCode, String theSystem, boolean theExpectDelivery, int theCount) throws InterruptedException {
Observation observation = new Observation();
CodeableConcept codeableConcept = new CodeableConcept();
observation.setCode(codeableConcept);
observation.getIdentifierFirstRep().setSystem("foo").setValue("1");
Coding coding = codeableConcept.addCoding();
coding.setCode(OBS_CODE);
coding.setSystem("SNOMED-CT");
coding.setCode(theCode);
coding.setSystem(theSystem);
observation.setStatus(Enumerations.ObservationStatus.FINAL);
IIdType id = createResource(observation, theExpectDelivery);
IIdType id = createResource(observation, theExpectDelivery, theCount);
observation.setId(id);
return observation;

View File

@ -39,7 +39,7 @@ public class CountingInterceptor implements ChannelInterceptor {
@Override
public void afterSendCompletion(Message<?> theMessage, MessageChannel theChannel, boolean theSent, Exception theException) {
ourLog.info("Counting another instance: {}", theMessage);
ourLog.info("Send complete for message: {}", theMessage);
if (theSent) {
mySent.add(theMessage.toString());
}

View File

@ -217,12 +217,12 @@ class HapiMigratorIT {
protected void doExecute() {
try {
myLatch.call(this);
myWaitLatch.awaitExpected();
ourLog.info("Latch released with parameter {}", myWaitLatch.getLatchInvocationParameter());
List<HookParams> hookParams = myWaitLatch.awaitExpected();
ourLog.info("Latch released with parameter {}", PointcutLatch.getLatchInvocationParameter(hookParams));
// We sleep a bit to ensure the other thread has a chance to try to get the lock. We don't have a hook there, so sleep instead
// Maybe we can await on a log message?
Thread.sleep(200);
ourLog.info("Completing execution of {}", myWaitLatch.getLatchInvocationParameter());
ourLog.info("Completing execution of {}", PointcutLatch.getLatchInvocationParameter(hookParams));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

View File

@ -26,19 +26,16 @@ import ca.uhn.fhir.jpa.binary.api.IBinaryStorageSvc;
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryResourceMatcher;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import org.apache.commons.lang3.StringUtils;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.commons.lang3.StringUtils.isBlank;

View File

@ -29,7 +29,6 @@ import org.hl7.fhir.r4.model.Measure;
import org.hl7.fhir.r4.model.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;

View File

@ -19,10 +19,10 @@
*/
package ca.uhn.fhir.cr.r4.measure;
import ca.uhn.fhir.cr.enumeration.CareGapsStatusCode;
import ca.uhn.fhir.cr.common.IDaoRegistryUser;
import ca.uhn.fhir.cr.common.Searches;
import ca.uhn.fhir.cr.config.CrProperties;
import ca.uhn.fhir.cr.enumeration.CareGapsStatusCode;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.rest.api.server.RequestDetails;
@ -59,7 +59,6 @@ import org.opencds.cqf.cql.evaluator.fhir.util.Ids;
import org.opencds.cqf.cql.evaluator.fhir.util.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.AbstractMap;
import java.util.ArrayList;

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.IdType;
@ -339,13 +338,14 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
@Override
public String toString() {
ToStringBuilder stringBuilder = new ToStringBuilder(this, ToStringStyle.MULTI_LINE_STYLE)
.append("idElement", myIdElement);
// .append("status", myStatus)
// .append("endpointUrl", myEndpointUrl)
// .append("payloadString", myPayloadString)
ToStringBuilder stringBuilder = new ToStringBuilder(this)
.append("myIdElement", myIdElement)
.append("myStatus", myStatus)
.append("myCriteriaString", myCriteriaString);
// .append("myEndpointUrl", myEndpointUrl)
// .append("myPayloadString", myPayloadString)
// .append("myHeaders", myHeaders)
// .append("channelType", myChannelType);
// .append("myChannelType", myChannelType)
// .append("myTrigger", myTrigger)
// .append("myEmailDetails", myEmailDetails)
// .append("myRestHookDetails", myRestHookDetails)

View File

@ -127,7 +127,7 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
@Override
public String toString() {
return new ToStringBuilder(this)
.append("mySubscription", mySubscription)
.append("mySubscription", mySubscription == null ? "null" : mySubscription.getIdElementString())
// it isn't safe to log payloads
.append("myPayloadString", "[Not Logged]")
.append("myPayload", myPayloadDecoded)

View File

@ -36,9 +36,9 @@ import java.util.stream.Stream;
public abstract class BaseFhirVersionParameterizedTest {
@RegisterExtension
public final RestServerR4Helper myRestServerR4Helper = new RestServerR4Helper(true);
public final RestServerR4Helper myRestServerR4Helper = RestServerR4Helper.newInitialized();
@RegisterExtension
public final RestServerDstu3Helper myRestServerDstu3Helper = new RestServerDstu3Helper(true);
public final RestServerDstu3Helper myRestServerDstu3Helper = RestServerDstu3Helper.newInitialized();
@RegisterExtension
public TlsAuthenticationTestHelper myTlsAuthenticationTestHelper = new TlsAuthenticationTestHelper();

View File

@ -20,7 +20,6 @@
package ca.uhn.fhir.test.utilities;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.rest.annotation.Transaction;
@ -50,26 +49,35 @@ import javax.servlet.ServletException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
public class RestServerDstu3Helper extends BaseRestServerHelper implements IPointcutLatch, BeforeEachCallback, AfterEachCallback {
protected final MyRestfulServer myRestServer;
public RestServerDstu3Helper() {
this(false);
this(false, false);
}
public RestServerDstu3Helper(boolean theInitialize) {
private RestServerDstu3Helper(boolean theInitialize, boolean theTransactionLatchEnabled) {
super(FhirContext.forDstu3());
myRestServer = new MyRestfulServer(myFhirContext);
if(theInitialize){
myRestServer = new MyRestfulServer(myFhirContext, theTransactionLatchEnabled);
if (theInitialize) {
try {
myRestServer.initialize();
} catch (ServletException e) {
throw new RuntimeException(Msg.code(2252)+"Failed to initialize server", e);
throw new RuntimeException(Msg.code(2252) + "Failed to initialize server", e);
}
}
}
public static RestServerDstu3Helper newInitialized() {
return new RestServerDstu3Helper(true, false);
}
public static RestServerDstu3Helper newWithTransactionLatch() {
return new RestServerDstu3Helper(false, true);
}
@Override
public void beforeEach(ExtensionContext context) throws Exception {
startServer(myRestServer);
@ -174,32 +182,56 @@ public class RestServerDstu3Helper extends BaseRestServerHelper implements IPoin
public static class MyPlainProvider implements IPointcutLatch {
private final PointcutLatch myPointcutLatch = new PointcutLatch("Transaction Counting Provider");
private final List<IBaseBundle> myTransactions = Collections.synchronizedList(new ArrayList<>());
private boolean myTransactionLatchEnabled;
public MyPlainProvider(boolean theTransactionLatchEnabled) {
this.myTransactionLatchEnabled = theTransactionLatchEnabled;
}
@Transaction
public synchronized IBaseBundle transaction(@TransactionParam IBaseBundle theBundle) {
if (myTransactionLatchEnabled) {
myPointcutLatch.call(theBundle);
}
myTransactions.add(theBundle);
return theBundle;
}
@Override
public void clear() {
if (!myTransactionLatchEnabled) {
throw new IllegalStateException("Can't call clear() on a provider that doesn't use a latch");
}
myPointcutLatch.clear();
}
@Override
public void setExpectedCount(int theCount) {
if (!myTransactionLatchEnabled) {
throw new IllegalStateException("Can't call clear() on a provider that doesn't use a latch");
}
myPointcutLatch.setExpectedCount(theCount);
}
@Override
public List<HookParams> awaitExpected() throws InterruptedException {
if (!myTransactionLatchEnabled) {
throw new IllegalStateException("Can't call clear() on a provider that doesn't use a latch");
}
return myPointcutLatch.awaitExpected();
}
public List<IBaseBundle> getTransactions() {
return Collections.unmodifiableList(new ArrayList<>(myTransactions));
}
public void setTransactionLatchEnabled(boolean theTransactionLatchEnabled) {
this.myTransactionLatchEnabled = theTransactionLatchEnabled;
}
public boolean isTransactionLatchEnabled() {
return myTransactionLatchEnabled;
}
}
private static class MyRestfulServer extends RestfulServer {
@ -209,15 +241,24 @@ public class RestServerDstu3Helper extends BaseRestServerHelper implements IPoin
private HashMapResourceProvider<Organization> myOrganizationResourceProvider;
private HashMapResourceProvider<ConceptMap> myConceptMapResourceProvider;
private MyPlainProvider myPlainProvider;
private final boolean myInitialTransactionLatchEnabled;
public MyRestfulServer(FhirContext theFhirContext) {
public MyRestfulServer(FhirContext theFhirContext, boolean theInitialTransactionLatchEnabled) {
super(theFhirContext);
myInitialTransactionLatchEnabled = theInitialTransactionLatchEnabled;
}
public MyPlainProvider getPlainProvider() {
return myPlainProvider;
}
public <T> T executeWithLatch(Supplier<T> theSupplier) throws InterruptedException {
myPlainProvider.setExpectedCount(1);
T retval = theSupplier.get();
myPlainProvider.awaitExpected();
return retval;
}
public void setFailNextPut(boolean theFailNextPut) {
myFailNextPut = theFailNextPut;
}
@ -229,7 +270,16 @@ public class RestServerDstu3Helper extends BaseRestServerHelper implements IPoin
provider.clearCounts();
}
}
myPlainProvider.clear();
if (isTransactionLatchEnabled()) {
getPlainProvider().clear();
}
}
private boolean isTransactionLatchEnabled() {
if (getPlainProvider() == null) {
return false;
}
return getPlainProvider().isTransactionLatchEnabled();
}
public void clearDataAndCounts() {
@ -272,7 +322,7 @@ public class RestServerDstu3Helper extends BaseRestServerHelper implements IPoin
myConceptMapResourceProvider = new MyHashMapResourceProvider(fhirContext, ConceptMap.class);
registerProvider(myConceptMapResourceProvider);
myPlainProvider = new MyPlainProvider();
myPlainProvider = new MyPlainProvider(myInitialTransactionLatchEnabled);
registerProvider(myPlainProvider);
}
@ -293,7 +343,7 @@ public class RestServerDstu3Helper extends BaseRestServerHelper implements IPoin
@Override
public MethodOutcome update(T theResource, String theConditional, RequestDetails theRequestDetails) {
if (myFailNextPut) {
throw new PreconditionFailedException(Msg.code(2251)+"Failed update operation");
throw new PreconditionFailedException(Msg.code(2251) + "Failed update operation");
}
return super.update(theResource, theConditional, theRequestDetails);
}

View File

@ -61,12 +61,12 @@ public class RestServerR4Helper extends BaseRestServerHelper implements BeforeEa
protected final MyRestfulServer myRestServer;
public RestServerR4Helper() {
this(false);
this(false, false);
}
public RestServerR4Helper(boolean theInitialize) {
private RestServerR4Helper(boolean theInitialize, boolean theTransactionLatchEnabled) {
super(FhirContext.forR4Cached());
myRestServer = new MyRestfulServer(myFhirContext);
myRestServer = new MyRestfulServer(myFhirContext, theTransactionLatchEnabled);
if (theInitialize) {
try {
myRestServer.initialize();
@ -76,6 +76,14 @@ public class RestServerR4Helper extends BaseRestServerHelper implements BeforeEa
}
}
public static RestServerR4Helper newWithTransactionLatch() {
return new RestServerR4Helper(false, true);
}
public static RestServerR4Helper newInitialized() {
return new RestServerR4Helper(true, false);
}
@Override
public void beforeEach(ExtensionContext context) throws Exception {
startServer(myRestServer);
@ -252,6 +260,14 @@ public class RestServerR4Helper extends BaseRestServerHelper implements BeforeEa
myRestServer.setServerAddressStrategy(theServerAddressStrategy);
}
public void executeWithLatch(Runnable theRunnable) throws InterruptedException {
myRestServer.executeWithLatch(theRunnable);
}
public void enableTransactionLatch(boolean theTransactionLatchEnabled) {
myRestServer.setTransactionLatchEnabled(theTransactionLatchEnabled);
}
private static class MyRestfulServer extends RestfulServer {
private final List<String> myRequestUrls = Collections.synchronizedList(new ArrayList<>());
private final List<String> myRequestVerbs = Collections.synchronizedList(new ArrayList<>());
@ -263,13 +279,31 @@ public class RestServerR4Helper extends BaseRestServerHelper implements BeforeEa
private HashMapResourceProvider<ConceptMap> myConceptMapResourceProvider;
private RestServerDstu3Helper.MyPlainProvider myPlainProvider;
public MyRestfulServer(FhirContext theFhirContext) {
private final boolean myInitialTransactionLatchEnabled;
public MyRestfulServer(FhirContext theFhirContext, boolean theInitialTransactionLatchEnabled) {
super(theFhirContext);
myInitialTransactionLatchEnabled = theInitialTransactionLatchEnabled;
}
public RestServerDstu3Helper.MyPlainProvider getPlainProvider() {
return myPlainProvider;
}
protected boolean isTransactionLatchEnabled() {
if (getPlainProvider() == null) {
return false;
}
return getPlainProvider().isTransactionLatchEnabled();
}
public void setTransactionLatchEnabled(boolean theTransactionLatchEnabled) {
getPlainProvider().setTransactionLatchEnabled(theTransactionLatchEnabled);
}
public void executeWithLatch(Runnable theRunnable) throws InterruptedException {
myPlainProvider.setExpectedCount(1);
theRunnable.run();
myPlainProvider.awaitExpected();
}
public void setFailNextPut(boolean theFailNextPut) {
myFailNextPut = theFailNextPut;
@ -282,7 +316,9 @@ public class RestServerR4Helper extends BaseRestServerHelper implements BeforeEa
provider.clearCounts();
}
}
if (isTransactionLatchEnabled()) {
myPlainProvider.clear();
}
myRequestUrls.clear();
myRequestVerbs.clear();
}
@ -364,7 +400,7 @@ public class RestServerR4Helper extends BaseRestServerHelper implements BeforeEa
myConceptMapResourceProvider = new MyHashMapResourceProvider(fhirContext, ConceptMap.class);
registerProvider(myConceptMapResourceProvider);
myPlainProvider = new RestServerDstu3Helper.MyPlainProvider();
myPlainProvider = new RestServerDstu3Helper.MyPlainProvider(myInitialTransactionLatchEnabled);
registerProvider(myPlainProvider);
setPagingProvider(new FifoMemoryPagingProvider(20));

View File

@ -27,43 +27,33 @@ import ca.uhn.fhir.interceptor.api.IPointcut;
import com.google.common.collect.ListMultimap;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
// This class is primarily used for testing.
public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
private static final Logger ourLog = LoggerFactory.getLogger(PointcutLatch.class);
private static final int DEFAULT_TIMEOUT_SECONDS = 10;
private static final FhirObjectPrinter ourFhirObjectToStringMapper = new FhirObjectPrinter();
private final String myName;
private final AtomicLong myLastInvoke = new AtomicLong();
private final AtomicReference<CountDownLatch> myCountdownLatch = new AtomicReference<>();
private final AtomicReference<String> myCountdownLatchSetStacktrace = new AtomicReference<>();
private final AtomicReference<List<String>> myFailures = new AtomicReference<>();
private final AtomicReference<List<HookParams>> myCalledWith = new AtomicReference<>();
private final IPointcut myPointcut;
private boolean myStrict = true;
private final AtomicLong myLastInvoke = new AtomicLong();
private int myDefaultTimeoutSeconds = DEFAULT_TIMEOUT_SECONDS;
private int myInitialCount;
private boolean myExactMatch;
private final List<PointcutLatchException> myUnexpectedInvocations = new ArrayList<>();
private final AtomicReference<PointcutLatchSession> myPointcutLatchSession = new AtomicReference<>();
public PointcutLatch(IPointcut thePointcut) {
this.myName = thePointcut.name();
myPointcut = thePointcut;
}
public PointcutLatch(String theName) {
this.myName = theName;
myPointcut = null;
@ -80,9 +70,8 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
}
// Useful for debugging when you need more time to step through a method
public PointcutLatch setDefaultTimeoutSeconds(int theDefaultTimeoutSeconds) {
public void setDefaultTimeoutSeconds(int theDefaultTimeoutSeconds) {
myDefaultTimeoutSeconds = theDefaultTimeoutSeconds;
return this;
}
@Override
@ -91,12 +80,11 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
}
public void setExpectedCount(int theCount, boolean theExactMatch) {
if (myCountdownLatch.get() != null) {
String previousStack = myCountdownLatchSetStacktrace.get();
throw new PointcutLatchException(Msg.code(1480) + "setExpectedCount() called before previous awaitExpected() completed. Previous set stack:\n" + previousStack);
if (myPointcutLatchSession.get() != null) {
String previousStack = myPointcutLatchSession.get().getStackTrace();
throw new PointcutLatchException(Msg.code(1480) + "setExpectedCount() called before previous awaitExpected() completed. Previous set stack:\n" + previousStack, myName);
}
myExactMatch = theExactMatch;
createLatch(theCount);
startSession(theCount, theExactMatch);
if (theExactMatch) {
ourLog.info("Expecting exactly {} calls to {} latch", theCount, myName);
} else {
@ -109,27 +97,11 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
}
public boolean isSet() {
return myCountdownLatch.get() != null;
return myPointcutLatchSession.get() != null;
}
private void createLatch(int theCount) {
myFailures.set(Collections.synchronizedList(new ArrayList<>()));
myCalledWith.set(Collections.synchronizedList(new ArrayList<>()));
myCountdownLatch.set(new CountDownLatch(theCount));
try {
throw new Exception(Msg.code(1481));
} catch (Exception e) {
myCountdownLatchSetStacktrace.set(ExceptionUtils.getStackTrace(e));
}
myInitialCount = theCount;
}
private void addFailure(String failure) {
if (myFailures.get() != null) {
myFailures.get().add(failure);
} else {
throw new PointcutLatchException(Msg.code(1482) + "trying to set failure on latch that hasn't been created: " + failure);
}
private void startSession(int theCount, boolean theExactMatch) {
myPointcutLatchSession.set(new PointcutLatchSession(getName(), theCount, theExactMatch));
}
private String getName() {
@ -141,77 +113,59 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
return awaitExpectedWithTimeout(myDefaultTimeoutSeconds);
}
public List<HookParams> awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException {
List<HookParams> retval = myCalledWith.get();
try {
CountDownLatch latch = myCountdownLatch.get();
Validate.notNull(latch, getName() + " awaitExpected() called before setExpected() called.");
if (!latch.await(timeoutSecond, TimeUnit.SECONDS)) {
throw new LatchTimedOutError(Msg.code(1483) + getName() + " timed out waiting " + timeoutSecond + " seconds for latch to countdown from " + myInitialCount + " to 0. Is " + latch.getCount() + ".");
}
public List<HookParams> awaitExpectedWithTimeout(int theTimeoutSecond) throws InterruptedException {
PointcutLatchSession initialSession = myPointcutLatchSession.get();
// Defend against ConcurrentModificationException
String error = getName();
if (myFailures.get() != null && myFailures.get().size() > 0) {
List<String> failures = new ArrayList<>(myFailures.get());
if (failures.size() > 1) {
error += " ERRORS: \n";
List<HookParams> retval;
try {
if (isSet()) {
retval = myPointcutLatchSession.get().awaitExpectedWithTimeout(theTimeoutSecond);
Validate.isTrue(initialSession.equals(myPointcutLatchSession.get()), "Concurrency error: Latch session switched while waiting.");
} else {
error += " ERROR: ";
}
error += String.join("\n", failures);
error += "\nLatch called with values: " + toCalledWithString();
throw new AssertionError(Msg.code(1484) + error);
throw new PointcutLatchException("awaitExpected() called before setExpected() called.", myName);
}
} finally {
clear();
}
Validate.isTrue(retval.equals(myCalledWith.get()), "Concurrency error: Latch switched while waiting.");
return retval;
}
@Override
public void clear() {
ourLog.debug("Clearing latch {}", getName());
myCountdownLatch.set(null);
myCountdownLatchSetStacktrace.set(null);
checkExceptions();
myPointcutLatchSession.set(null);
myUnexpectedInvocations.clear();
}
private String toCalledWithString() {
if (myCalledWith.get() == null) {
return "[]";
private void checkExceptions() {
if (!myStrict) {
return;
}
if (myUnexpectedInvocations.size() > 0) {
PointcutLatchException firstException = myUnexpectedInvocations.get(0);
int size = myUnexpectedInvocations.size();
if (firstException != null) {
throw new AssertionError(Msg.code(2344) + getName() + " had " + size + " exceptions. Throwing first one.", firstException);
}
// Defend against ConcurrentModificationException
List<HookParams> calledWith = new ArrayList<>(myCalledWith.get());
if (calledWith.isEmpty()) {
return "[]";
}
String retVal = "[ ";
retVal += calledWith.stream().flatMap(hookParams -> hookParams.values().stream()).map(ourFhirObjectToStringMapper).collect(Collectors.joining(", "));
return retVal + " ]";
}
@Override
public void invoke(IPointcut thePointcut, HookParams theArgs) {
myLastInvoke.set(System.currentTimeMillis());
CountDownLatch latch = myCountdownLatch.get();
if (myExactMatch) {
if (latch == null) {
throw new PointcutLatchException(Msg.code(1485) + "invoke() for " + myName + " called outside of setExpectedCount() .. awaitExpected(). Probably got more invocations than expected or clear() was called before invoke() arrived with args: " + theArgs, theArgs);
} else if (latch.getCount() <= 0) {
addFailure("invoke() called when countdown was zero.");
try {
PointcutLatchSession session = myPointcutLatchSession.get();
if (session == null) {
throw new PointcutLatchException(Msg.code(1485) + "invoke() called outside of setExpectedCount() .. awaitExpected(). Probably got more invocations than expected or clear() was called before invoke().", myName, theArgs);
}
} else if (latch == null || latch.getCount() <= 0) {
return;
session.invoke(theArgs);
} catch (PointcutLatchException e) {
myUnexpectedInvocations.add(e);
throw e;
}
if (myCalledWith.get() != null) {
myCalledWith.get().add(theArgs);
}
ourLog.debug("Called {} {} with {}", myName, latch, hookParamsToString(theArgs));
latch.countDown();
}
public void call(Object arg) {
@ -222,46 +176,24 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
public String toString() {
return new ToStringBuilder(this)
.append("name", myName)
.append("myCountdownLatch", myCountdownLatch)
// .append("myFailures", myFailures)
// .append("myCalledWith", myCalledWith)
.append("myInitialCount", myInitialCount)
.append("pointCutLatchSession", myPointcutLatchSession)
.toString();
}
public Object getLatchInvocationParameter() {
return getLatchInvocationParameter(myCalledWith.get());
public void setStrict(Boolean theStrict) {
myStrict = theStrict;
}
@SuppressWarnings("unchecked")
public <T> T getLatchInvocationParameterOfType(Class<T> theType) {
List<HookParams> hookParamsList = myCalledWith.get();
Validate.notNull(hookParamsList);
Validate.isTrue(hookParamsList.size() == 1, "Expected Pointcut to be invoked 1 time");
HookParams hookParams = hookParamsList.get(0);
public static <T> T getInvocationParameterOfType(List<HookParams> theHookParams, Class<T> theType) {
Validate.notNull(theHookParams);
Validate.isTrue(theHookParams.size() == 1, "Expected Pointcut to be invoked 1 time");
HookParams hookParams = theHookParams.get(0);
ListMultimap<Class<?>, Object> paramsForType = hookParams.getParamsForType();
List<Object> objects = paramsForType.get(theType);
Validate.isTrue(objects.size() == 1);
return (T) objects.get(0);
}
private class PointcutLatchException extends IllegalStateException {
private static final long serialVersionUID = 1372636272233536829L;
PointcutLatchException(String message, HookParams theArgs) {
super(getName() + ": " + message + " called with values: " + hookParamsToString(theArgs));
}
public PointcutLatchException(String message) {
super(getName() + ": " + message);
}
}
private static String hookParamsToString(HookParams hookParams) {
return hookParams.values().stream().map(ourFhirObjectToStringMapper).collect(Collectors.joining(", "));
}
public static Object getLatchInvocationParameter(List<HookParams> theHookParams) {
Validate.notNull(theHookParams);
Validate.isTrue(theHookParams.size() == 1, "Expected Pointcut to be invoked 1 time");

View File

@ -0,0 +1,15 @@
package ca.uhn.test.concurrency;
import ca.uhn.fhir.interceptor.api.HookParams;
class PointcutLatchException extends IllegalStateException {
private static final long serialVersionUID = 1372636272233536829L;
PointcutLatchException(String theMessage, String theName, HookParams theArgs) {
super(theName + ": " + theMessage + " called with values:\n" + theArgs);
}
public PointcutLatchException(String theMessage, String theName) {
super(theName + ": " + theMessage);
}
}

View File

@ -0,0 +1,85 @@
package ca.uhn.test.concurrency;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* This object exists from the time setExpectedCount() is called until awaitExpected() completes. It tracks
* invocations on the PointcutLatch and throws exceptions when incorrect states are detected.
*/
public class PointcutLatchSession {
private static final Logger ourLog = LoggerFactory.getLogger(PointcutLatchSession.class);
private final List<String> myFailures = Collections.synchronizedList(new ArrayList<>());
private final List<HookParams> myCalledWith = Collections.synchronizedList(new ArrayList<>());
private final CountDownLatch myCountdownLatch;
private final String myStacktrace;
private final String myName;
private final int myInitialCount;
private final boolean myExactMatch;
PointcutLatchSession(String theName, int theCount, boolean theExactMatch) {
myName = theName;
myInitialCount = theCount;
myCountdownLatch = new CountDownLatch(theCount);
myExactMatch = theExactMatch;
try {
throw new Exception(Msg.code(1481));
} catch (Exception e) {
myStacktrace = ExceptionUtils.getStackTrace(e);
}
}
String getStackTrace() {
return myStacktrace;
}
List<HookParams> awaitExpectedWithTimeout(int theTimeoutSecond) throws InterruptedException {
if (!myCountdownLatch.await(theTimeoutSecond, TimeUnit.SECONDS)) {
throw new LatchTimedOutError(Msg.code(1483) + myName + " timed out waiting " + theTimeoutSecond + " seconds for latch to countdown from " + myInitialCount + " to 0. Is " + myCountdownLatch.getCount() + ".");
}
// Defend against ConcurrentModificationException
String error = myName;
if (myFailures.size() > 0) {
List<String> failures = new ArrayList<>(myFailures);
if (failures.size() > 1) {
error += " ERRORS: \n";
} else {
error += " ERROR: ";
}
error += String.join("\n", failures);
error += "\nLatch called " + myCalledWith.size() + " times with values:\n" + StringUtils.join(myCalledWith, "\n");
throw new AssertionError(Msg.code(1484) + error);
}
return myCalledWith;
}
void invoke(HookParams theArgs) {
if (myExactMatch) {
if (myCountdownLatch.getCount() <= 0) {
myFailures.add("invoke() called when countdown was zero.");
}
}
myCalledWith.add(theArgs);
ourLog.debug("Called {} {} with {}", myName, myCountdownLatch, theArgs);
myCountdownLatch.countDown();
}
List<HookParams> getCalledWith() {
return myCalledWith;
}
}

View File

@ -0,0 +1,138 @@
package ca.uhn.test.concurrency;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.fail;
class PointcutLatchTest {
private static final Logger ourLog = LoggerFactory.getLogger(PointcutLatchTest.class);
public static final String TEST_LATCH_NAME = "test-latch-name";
private final ExecutorService myExecutorService = Executors.newSingleThreadExecutor();
private final PointcutLatch myPointcutLatch = new PointcutLatch(TEST_LATCH_NAME);
@AfterEach
void after() {
myPointcutLatch.clear();
myPointcutLatch.setStrict(true);
}
@Test
public void testInvokeSameThread() throws InterruptedException {
myPointcutLatch.setExpectedCount(1);
Thread thread = invoke();
assertEquals(thread, Thread.currentThread());
myPointcutLatch.awaitExpected();
}
private Thread invoke() {
ourLog.info("invoke");
myPointcutLatch.call(this);
return Thread.currentThread();
}
@Test
public void testInvokeDifferentThread() throws InterruptedException, ExecutionException {
myPointcutLatch.setExpectedCount(1);
Future<Thread> future = myExecutorService.submit(this::invoke);
myPointcutLatch.awaitExpected();
assertNotEquals(Thread.currentThread(), future.get());
}
@Test
public void testDoubleExpect() {
myPointcutLatch.setExpectedCount(1);
try {
myPointcutLatch.setExpectedCount(1);
fail();
} catch (PointcutLatchException e) {
assertThat(e.getMessage(), startsWith(TEST_LATCH_NAME + ": HAPI-1480: setExpectedCount() called before previous awaitExpected() completed. Previous set stack:"));
}
}
@Test
public void testNotCalled() throws InterruptedException {
myPointcutLatch.setExpectedCount(1);
try {
myPointcutLatch.awaitExpectedWithTimeout(1);
fail();
} catch (LatchTimedOutError e) {
assertEquals("HAPI-1483: test-latch-name PointcutLatch timed out waiting 1 seconds for latch to countdown from 1 to 0. Is 1.", e.getMessage());
}
}
@Test
public void testAwaitExpectedCalledBeforeExpect() throws InterruptedException {
try {
myPointcutLatch.awaitExpected();
fail();
} catch (PointcutLatchException e) {
assertEquals(TEST_LATCH_NAME + ": awaitExpected() called before setExpected() called.", e.getMessage());
}
}
@Test
public void testInvokeCalledBeforeExpect() {
try {
invoke();
fail();
} catch (PointcutLatchException e) {
assertThat(e.getMessage(), startsWith(TEST_LATCH_NAME + ": HAPI-1485: invoke() called outside of setExpectedCount() .. awaitExpected(). Probably got more invocations than expected or clear() was called before invoke()."));
}
// Don't blow up in the clear() called by @AfterEach
myPointcutLatch.setStrict(false);
}
@Test
public void testDoubleInvokeInexact() throws InterruptedException {
myPointcutLatch.setExpectedCount(1, false);
invoke();
invoke();
myPointcutLatch.awaitExpected();
}
@Test
public void testDoubleInvokeExact() throws InterruptedException {
myPointcutLatch.setExpectedCount(1);
invoke();
try {
invoke();
myPointcutLatch.awaitExpected();
fail();
} catch (AssertionError e) {
assertThat(e.getMessage(), startsWith("HAPI-1484: test-latch-name PointcutLatch ERROR: invoke() called when countdown was zero."));
}
}
@Test
public void testInvokeThenClear() throws ExecutionException, InterruptedException {
Future<Thread> future = myExecutorService.submit(this::invoke);
try {
future.get();
} catch (ExecutionException e) {
// This is the exception thrown on the invocation thread
assertThat(e.getMessage(), startsWith("ca.uhn.test.concurrency.PointcutLatchException: " + TEST_LATCH_NAME + ": HAPI-1485: invoke() called outside of setExpectedCount() .. awaitExpected(). Probably got more invocations than expected or clear() was called before invoke()."));
}
try {
myPointcutLatch.clear();
} catch (AssertionError e) {
// This is the exception the test thread gets
assertThat(e.getMessage(), startsWith("HAPI-2344: " + TEST_LATCH_NAME + " PointcutLatch had 1 exceptions. Throwing first one."));
}
// Don't blow up in the clear() called by @AfterEach
myPointcutLatch.setStrict(false);
}
}

View File

@ -117,7 +117,7 @@
<module>hapi-fhir-android</module>
<module>hapi-fhir-cli</module>
<module>hapi-fhir-dist</module>
<module>tests/hapi-fhir-base-test-jaxrsserver-kotlin</module>
<!-- <module>tests/hapi-fhir-base-test-jaxrsserver-kotlin</module>-->
<module>tests/hapi-fhir-base-test-mindeps-client</module>
<module>tests/hapi-fhir-base-test-mindeps-server</module>
<module>hapi-fhir-spring-boot</module>