From 2c7eb39b29510220e296417d4696de5f33ad3519 Mon Sep 17 00:00:00 2001 From: Ken Stevens Date: Tue, 22 Jan 2019 18:53:54 -0500 Subject: [PATCH] final batch of windows fixes (to deal with jumpy windows clock) also added semaphore to PointcutLatch --- .../FhirResourceDaoDstu2SearchNoFtTest.java | 13 ++- .../FhirResourceDaoDstu3SearchNoFtTest.java | 21 ++-- .../r4/FhirResourceDaoR4SearchNoFtTest.java | 20 +++- .../FhirResourceDaoR4SearchNoHashesTest.java | 7 ++ .../jpa/dao/r4/FhirResourceDaoR4Test.java | 8 +- .../dao/r4/FhirResourceDaoR4UpdateTest.java | 4 +- .../r4/ResourceProviderR4CacheTest.java | 9 +- .../subscription/module/LatchedService.java | 72 -------------- .../subscription/module/PointcutLatch.java | 95 +++++++++++++++++++ ...kingQueueSubscribableChannelDstu3Test.java | 23 +++-- .../SubscriptionCheckingSubscriberTest.java | 8 +- .../SubscriptionMatchingSubscriberTest.java | 8 +- 12 files changed, 173 insertions(+), 115 deletions(-) delete mode 100644 hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/LatchedService.java create mode 100644 hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/PointcutLatch.java diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/FhirResourceDaoDstu2SearchNoFtTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/FhirResourceDaoDstu2SearchNoFtTest.java index 9662d068099..bfa828616d7 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/FhirResourceDaoDstu2SearchNoFtTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu2/FhirResourceDaoDstu2SearchNoFtTest.java @@ -374,6 +374,9 @@ public class FhirResourceDaoDstu2SearchNoFtTest extends BaseJpaDstu2Test { patient.addIdentifier().setSystem("urn:system").setValue("001"); id1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } + + TestUtil.sleepOneClick(); + long betweenTime = System.currentTimeMillis(); IIdType id2; { @@ -820,8 +823,6 @@ public class FhirResourceDaoDstu2SearchNoFtTest extends BaseJpaDstu2Test { @Test public void testSearchLastUpdatedParamWithComparator() throws InterruptedException { - String methodName = "testSearchLastUpdatedParamWithComparator"; - IIdType id0; { Patient patient = new Patient(); @@ -829,18 +830,16 @@ public class FhirResourceDaoDstu2SearchNoFtTest extends BaseJpaDstu2Test { id0 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } - int sleep = 100; - long start = System.currentTimeMillis(); - Thread.sleep(sleep); + TestUtil.sleepOneClick(); - DateTimeDt beforeAny = new DateTimeDt(new Date(), TemporalPrecisionEnum.MILLI); IIdType id1a; { Patient patient = new Patient(); patient.addIdentifier().setSystem("urn:system").setValue("001"); id1a = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } + TestUtil.sleepOneClick(); IIdType id1b; { Patient patient = new Patient(); @@ -853,7 +852,7 @@ public class FhirResourceDaoDstu2SearchNoFtTest extends BaseJpaDstu2Test { InstantDt id1bpublished = ResourceMetadataKeyEnum.PUBLISHED.get(myPatientDao.read(id1b, mySrd)); ourLog.info("Res 3: {}", id1bpublished.getValueAsString()); - Thread.sleep(sleep); + TestUtil.sleepOneClick(); long end = System.currentTimeMillis(); SearchParameterMap params; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu3/FhirResourceDaoDstu3SearchNoFtTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu3/FhirResourceDaoDstu3SearchNoFtTest.java index 2e7d2c50ff0..77bdfa62ed1 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu3/FhirResourceDaoDstu3SearchNoFtTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/dstu3/FhirResourceDaoDstu3SearchNoFtTest.java @@ -1023,7 +1023,7 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test { patient.addName().setFamily("testSearchLanguageParam").addGiven("Joe"); id1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } - + TestUtil.sleepOneClick(); Date betweenTime = new Date(); IIdType id2; @@ -1199,10 +1199,9 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test { id0 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } - int sleep = 100; - + TestUtil.sleepOneClick(); long start = System.currentTimeMillis(); - TestUtil.sleepAtLeast(sleep); + TestUtil.sleepOneClick(); IIdType id1a; { @@ -1221,7 +1220,7 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test { ourLog.info("Res 2: {}", myPatientDao.read(id1a, mySrd).getMeta().getLastUpdatedElement().getValueAsString()); ourLog.info("Res 3: {}", myPatientDao.read(id1b, mySrd).getMeta().getLastUpdatedElement().getValueAsString()); - TestUtil.sleepAtLeast(sleep); + TestUtil.sleepOneClick(); long end = System.currentTimeMillis(); SearchParameterMap map; @@ -1856,15 +1855,15 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test { patient.addName().setFamily("Tester_testSearchStringParam").addGiven("Joe"); pid1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } + TestUtil.sleepOneClick(); Date between = new Date(); - Thread.sleep(10); { Patient patient = new Patient(); patient.addIdentifier().setSystem("urn:system").setValue("002"); patient.addName().setFamily("Tester_testSearchStringParam").addGiven("John"); pid2 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } - Thread.sleep(10); + TestUtil.sleepOneClick(); Date after = new Date(); SearchParameterMap params; @@ -2874,6 +2873,8 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test { tag1id = myOrganizationDao.create(org, mySrd).getId().toUnqualifiedVersionless(); } + TestUtil.sleepOneClick(); + Date betweenDate = new Date(); IIdType tag2id; @@ -3196,7 +3197,7 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test { p01.addName().setFamily("B").addGiven("A"); String id1 = myPatientDao.create(p01).getId().toUnqualifiedVersionless().getValue(); - Thread.sleep(10); + TestUtil.sleepOneClick(); // Numeric ID Patient p02 = new Patient(); @@ -3206,7 +3207,7 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test { p02.addName().setFamily("Z").addGiven("Z"); String id2 = myPatientDao.create(p02).getId().toUnqualifiedVersionless().getValue(); - Thread.sleep(10); + TestUtil.sleepOneClick(); // Forced ID Patient pAB = new Patient(); @@ -3216,7 +3217,7 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test { pAB.addName().setFamily("A").addGiven("B"); myPatientDao.update(pAB); - Thread.sleep(10); + TestUtil.sleepOneClick(); // Forced ID Patient pAA = new Patient(); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchNoFtTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchNoFtTest.java index bfcbdc6a22c..221fbeaec8c 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchNoFtTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchNoFtTest.java @@ -889,10 +889,11 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test { patient.addIdentifier().setSystem("urn:system").setValue("001"); id1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } - long betweenTime = System.currentTimeMillis(); TestUtil.sleepOneClick(); + long betweenTime = System.currentTimeMillis(); + IIdType id2; { Patient patient = new Patient(); @@ -1267,6 +1268,8 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test { id1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } + TestUtil.sleepOneClick(); + Date betweenTime = new Date(); TestUtil.sleepOneClick(); @@ -1444,6 +1447,7 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test { id0 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } + TestUtil.sleepOneClick(); long start = System.currentTimeMillis(); @@ -1455,6 +1459,9 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test { patient.addIdentifier().setSystem("urn:system").setValue("001"); id1a = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } + + TestUtil.sleepOneClick(); + IIdType id1b; { Patient patient = new Patient(); @@ -1863,15 +1870,15 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test { obs01.setSubject(new Reference(patientId01)); IIdType obsId01 = myObservationDao.create(obs01, mySrd).getId().toUnqualifiedVersionless(); + TestUtil.sleepOneClick(); Date between = new Date(); - Thread.sleep(10); Observation obs02 = new Observation(); obs02.setEffective(new DateTimeType(new Date())); obs02.setSubject(new Reference(locId01)); IIdType obsId02 = myObservationDao.create(obs02, mySrd).getId().toUnqualifiedVersionless(); - Thread.sleep(10); + TestUtil.sleepOneClick(); Date after = new Date(); ourLog.info("P1[{}] L1[{}] Obs1[{}] Obs2[{}]", patientId01, locId01, obsId01, obsId02); @@ -1994,15 +2001,16 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test { patient.addName().setFamily("Tester_testSearchStringParam").addGiven("Joe"); pid1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } + TestUtil.sleepOneClick(); Date between = new Date(); - Thread.sleep(10); + { Patient patient = new Patient(); patient.addIdentifier().setSystem("urn:system").setValue("002"); patient.addName().setFamily("Tester_testSearchStringParam").addGiven("John"); pid2 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } - Thread.sleep(10); + TestUtil.sleepOneClick(); Date after = new Date(); SearchParameterMap params; @@ -2972,6 +2980,8 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test { tag1id = myOrganizationDao.create(org, mySrd).getId().toUnqualifiedVersionless(); } + TestUtil.sleepOneClick(); + Date betweenDate = new Date(); IIdType tag2id; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchNoHashesTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchNoHashesTest.java index 8300f487fd7..9d2c7036f75 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchNoHashesTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchNoHashesTest.java @@ -1443,7 +1443,10 @@ public class FhirResourceDaoR4SearchNoHashesTest extends BaseJpaR4Test { id0 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } + TestUtil.sleepOneClick(); + long start = System.currentTimeMillis(); + TestUtil.sleepOneClick(); IIdType id1a; @@ -1452,6 +1455,9 @@ public class FhirResourceDaoR4SearchNoHashesTest extends BaseJpaR4Test { patient.addIdentifier().setSystem("urn:system").setValue("001"); id1a = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } + + TestUtil.sleepOneClick(); + IIdType id1b; { Patient patient = new Patient(); @@ -1992,6 +1998,7 @@ public class FhirResourceDaoR4SearchNoHashesTest extends BaseJpaR4Test { patient.addName().setFamily("Tester_testSearchStringParam").addGiven("Joe"); pid1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); } + TestUtil.sleepOneClick(); Date between = new Date(); TestUtil.sleepOneClick(); { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4Test.java index f75d4a5fbba..ea6c924d38b 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4Test.java @@ -10,6 +10,7 @@ import ca.uhn.fhir.jpa.model.entity.*; import ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl; import ca.uhn.fhir.jpa.searchparam.SearchParamConstants; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; +import ca.uhn.fhir.jpa.util.TestUtil; import ca.uhn.fhir.model.api.Include; import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum; import ca.uhn.fhir.model.valueset.BundleEntrySearchModeEnum; @@ -20,7 +21,6 @@ import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.param.*; import ca.uhn.fhir.rest.server.exceptions.*; import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails; -import ca.uhn.fhir.util.TestUtil; import com.google.common.base.Charsets; import com.google.common.collect.Lists; import org.apache.commons.io.IOUtils; @@ -3159,16 +3159,22 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test { p.addName().setFamily(methodName); IIdType id1 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless(); + TestUtil.sleepOneClick(); + p = new Patient(); p.addIdentifier().setSystem("urn:system2").setValue(methodName); p.addName().setFamily(methodName); IIdType id2 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless(); + TestUtil.sleepOneClick(); + p = new Patient(); p.addIdentifier().setSystem("urn:system3").setValue(methodName); p.addName().setFamily(methodName); IIdType id3 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless(); + TestUtil.sleepOneClick(); + p = new Patient(); p.addIdentifier().setSystem("urn:system4").setValue(methodName); p.addName().setFamily(methodName); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4UpdateTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4UpdateTest.java index b1f7d1b2c4c..9301cbd1c59 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4UpdateTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4UpdateTest.java @@ -313,10 +313,10 @@ public class FhirResourceDaoR4UpdateTest extends BaseJpaR4Test { assertEquals("1", outcome.getId().getVersionIdPart()); - Date now = new Date(); - TestUtil.sleepOneClick(); + Date now = new Date(); + Patient retrieved = myPatientDao.read(outcome.getId(), mySrd); InstantType updated = TestUtil.getTimestamp(retrieved); assertTrue(updated.before(now)); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4CacheTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4CacheTest.java index 16ed5ca99cb..dd4eb173b2f 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4CacheTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4CacheTest.java @@ -159,14 +159,19 @@ public class ResourceProviderR4CacheTest extends BaseResourceProviderR4Test { ourClient.create().resource(pt1).execute(); Date beforeFirst = new Date(); + TestUtil.sleepOneClick(); Bundle results1 = ourClient.search().forResource("Patient").where(Patient.FAMILY.matches().value("FAM")).returnBundle(Bundle.class).execute(); + + TestUtil.sleepOneClick(); + assertEquals(1, results1.getEntry().size()); assertEquals(1, mySearchEntityDao.count()); assertThat(myCapturingInterceptor.getLastResponse().getHeaders(Constants.HEADER_X_CACHE), empty()); - assertThat(TestUtil.getTimestamp(results1).getValue(), greaterThan(beforeFirst)); - assertThat(TestUtil.getTimestamp(results1).getValue(), lessThan(new Date())); + Date results1Date = TestUtil.getTimestamp(results1).getValue(); + assertThat(results1Date, greaterThan(beforeFirst)); + assertThat(results1Date, lessThan(new Date())); assertThat(results1.getId(), not(blankOrNullString())); Patient pt2 = new Patient(); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/LatchedService.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/LatchedService.java deleted file mode 100644 index 0bb32a97fe5..00000000000 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/LatchedService.java +++ /dev/null @@ -1,72 +0,0 @@ -package ca.uhn.fhir.jpa.subscription.module; - -import ca.uhn.fhir.jpa.model.interceptor.api.HookParams; -import ca.uhn.fhir.jpa.model.interceptor.api.IAnonymousLambdaHook; -import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -import static org.junit.Assert.assertTrue; - -public class LatchedService implements IAnonymousLambdaHook { - private static final Logger ourLog = LoggerFactory.getLogger(LatchedService.class); - private static final int DEFAULT_TIMEOUT_SECONDS = 20; - private final String name; - - private CountDownLatch myCountdownLatch; - private AtomicReference myFailure; - private AtomicReference> myCalledWith; - - public LatchedService(Pointcut thePointcut) { - this.name = thePointcut.name(); - } - - public LatchedService(String theName) { - this.name = theName; - } - - public void countdown() { - if (myCountdownLatch == null) { - myFailure.set(name + " latch countdown() called before expectedCount set."); - } else if (myCountdownLatch.getCount() <= 0) { - myFailure.set(name + " latch countdown() called "+ (1 - myCountdownLatch.getCount()) + " more times than expected."); - } - ourLog.info("{} counting down {}", name, myCountdownLatch); - myCountdownLatch.countDown(); - } - - public void setExpectedCount(int count) { - myFailure = new AtomicReference<>(); - myCalledWith = new AtomicReference<>(new ArrayList<>()); - myCountdownLatch = new CountDownLatch(count); - } - - public void awaitExpected() throws InterruptedException { - awaitExpectedWithTimeout(DEFAULT_TIMEOUT_SECONDS); - } - - public void awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException { - assertTrue(name +" latch timed out waiting "+timeoutSecond+" seconds for latch to be triggered.", myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS)); - - if (myFailure.get() != null) { - String error = myFailure.get(); - error += "\nLatch called with values: "+myCalledWith.get().stream().map(Object::toString).collect(Collectors.joining(", ")); - throw new AssertionError(error); - } - } - - @Override - public void invoke(HookParams theArgs) { - this.countdown(); - if (myCalledWith.get() != null) { - myCalledWith.get().add(theArgs); - } - } -} diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/PointcutLatch.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/PointcutLatch.java new file mode 100644 index 00000000000..a8d6f91468e --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/PointcutLatch.java @@ -0,0 +1,95 @@ +package ca.uhn.fhir.jpa.subscription.module; + +import ca.uhn.fhir.jpa.model.interceptor.api.HookParams; +import ca.uhn.fhir.jpa.model.interceptor.api.IAnonymousLambdaHook; +import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class PointcutLatch implements IAnonymousLambdaHook { + private static final Logger ourLog = LoggerFactory.getLogger(PointcutLatch.class); + private static final int DEFAULT_TIMEOUT_SECONDS = 10; + private final String name; + + private Semaphore mySemaphore = new Semaphore(1); + private CountDownLatch myCountdownLatch; + private AtomicReference myFailure; + private AtomicReference> myCalledWith; + + public PointcutLatch(Pointcut thePointcut) { + this.name = thePointcut.name(); + } + + public PointcutLatch(String theName) { + this.name = theName; + } + + private void countdown() { + if (myCountdownLatch == null) { + myFailure.set(name + " latch countdown() called before expectedCount set."); + } else if (myCountdownLatch.getCount() <= 0) { + myFailure.set(name + " latch countdown() called "+ (1 - myCountdownLatch.getCount()) + " more times than expected."); + } + ourLog.info("{} counting down {}", name, myCountdownLatch); + myCountdownLatch.countDown(); + } + + public void setExpectedCount(int count) throws InterruptedException { + mySemaphore.acquire(); + if (myCountdownLatch != null) { + myFailure.set(name + " latch setExpectedCount() called before previous awaitExpected() completed."); + } + myFailure = new AtomicReference<>(); + myCalledWith = new AtomicReference<>(new ArrayList<>()); + myCountdownLatch = new CountDownLatch(count); + } + + public void awaitExpected() throws InterruptedException { + awaitExpected(true); + } + + public void awaitExpected(boolean release) throws InterruptedException { + awaitExpectedWithTimeout(DEFAULT_TIMEOUT_SECONDS, release); + } + + public void awaitExpectedWithTimeout(int timeoutSecond, boolean release) throws InterruptedException { + try { + assertNotNull(name + " latch awaitExpected() called before previous setExpected() called.", myCountdownLatch); + assertTrue(name + " latch timed out waiting " + timeoutSecond + " seconds for latch to be triggered.", myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS)); + + if (myFailure.get() != null) { + String error = myFailure.get(); + error += "\nLatch called with values: " + myCalledWith.get().stream().map(Object::toString).collect(Collectors.joining(", ")); + throw new AssertionError(error); + } + } finally { + if (release) { + release(); + } + } + } + + public void release() { + myCountdownLatch = null; + mySemaphore.release(); + } + + @Override + public void invoke(HookParams theArgs) { + this.countdown(); + if (myCalledWith.get() != null) { + myCalledWith.get().add(theArgs); + } + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java index 4cb096dba32..4269641541a 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java @@ -1,15 +1,14 @@ package ca.uhn.fhir.jpa.subscription.module.standalone; import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.model.interceptor.api.HookParams; import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.model.interceptor.executor.InterceptorRegistry; import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test; -import ca.uhn.fhir.jpa.subscription.module.LatchedService; +import ca.uhn.fhir.jpa.subscription.module.PointcutLatch; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory; -import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; -import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest; import ca.uhn.fhir.rest.annotation.Create; import ca.uhn.fhir.rest.annotation.ResourceParam; @@ -65,8 +64,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base private static SubscribableChannel ourSubscribableChannel; private List mySubscriptionIds = Collections.synchronizedList(new ArrayList<>()); private long idCounter = 0; - protected LatchedService mySubscriptionMatchingPost = new LatchedService(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED); - protected LatchedService mySubscriptionActivatedPost = new LatchedService(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); + protected PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED); + protected PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); @Before public void beforeReset() { @@ -165,7 +164,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base public static class ObservationListener implements IResourceProvider { - private LatchedService updateLatch = new LatchedService("Observation Update"); + private PointcutLatch updateLatch = new PointcutLatch("Observation Update"); @Create public MethodOutcome create(@ResourceParam Observation theObservation, HttpServletRequest theRequest) { @@ -184,17 +183,21 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base public MethodOutcome update(@ResourceParam Observation theObservation, HttpServletRequest theRequest) { ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", "")); ourUpdatedObservations.add(theObservation); - updateLatch.countdown(); + updateLatch.invoke(new HookParams().add(Observation.class, theObservation)); ourLog.info("Received Listener Update (now have {} updates)", ourUpdatedObservations.size()); return new MethodOutcome(new IdType("Observation/1"), false); } - public void setExpectedCount(int count) { + public void setExpectedCount(int count) throws InterruptedException { updateLatch.setExpectedCount(count); } - public void awaitExpected() throws InterruptedException { - updateLatch.awaitExpected(); + public void awaitExpected(boolean release) throws InterruptedException { + updateLatch.awaitExpected(release); + } + + public void release() { + updateLatch.release(); } } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java index 58e5f696f38..5af2d728dff 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java @@ -27,9 +27,10 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri ourObservationListener.setExpectedCount(1); sendObservation(code, "SNOMED-CT"); - ourObservationListener.awaitExpected(); + ourObservationListener.awaitExpected(false); assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); + ourObservationListener.release(); } @Test @@ -45,9 +46,10 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri ourObservationListener.setExpectedCount(1); sendObservation(code, "SNOMED-CT"); - ourObservationListener.awaitExpected(); + ourObservationListener.awaitExpected(false); assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0)); + ourObservationListener.release(); } @Test @@ -65,6 +67,6 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri mySubscriptionMatchingPost.setExpectedCount(1); sendObservation(code, "SNOMED-CT"); mySubscriptionMatchingPost.awaitExpected(); - ourObservationListener.awaitExpected(); + ourObservationListener.awaitExpected(true); } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java index 0bd4ac035c1..8fd2e436e4e 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java @@ -27,9 +27,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri ourObservationListener.setExpectedCount(1); sendObservation(code, "SNOMED-CT"); - ourObservationListener.awaitExpected(); + ourObservationListener.awaitExpected(false); assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); + ourObservationListener.release(); } @Test @@ -45,9 +46,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri ourObservationListener.setExpectedCount(1); sendObservation(code, "SNOMED-CT"); - ourObservationListener.awaitExpected(); + ourObservationListener.awaitExpected(false); assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0)); + ourObservationListener.release(); } @Test @@ -65,6 +67,6 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri mySubscriptionMatchingPost.setExpectedCount(1); sendObservation(code, "SNOMED-CT"); mySubscriptionMatchingPost.awaitExpected(); - ourObservationListener.awaitExpected(); + ourObservationListener.awaitExpected(true); } }