final batch of windows fixes (to deal with jumpy windows clock)

also added semaphore to PointcutLatch
This commit is contained in:
Ken Stevens 2019-01-22 18:53:54 -05:00
parent 6b22977d7c
commit 2c7eb39b29
12 changed files with 173 additions and 115 deletions

View File

@ -374,6 +374,9 @@ public class FhirResourceDaoDstu2SearchNoFtTest extends BaseJpaDstu2Test {
patient.addIdentifier().setSystem("urn:system").setValue("001"); patient.addIdentifier().setSystem("urn:system").setValue("001");
id1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); id1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
TestUtil.sleepOneClick();
long betweenTime = System.currentTimeMillis(); long betweenTime = System.currentTimeMillis();
IIdType id2; IIdType id2;
{ {
@ -820,8 +823,6 @@ public class FhirResourceDaoDstu2SearchNoFtTest extends BaseJpaDstu2Test {
@Test @Test
public void testSearchLastUpdatedParamWithComparator() throws InterruptedException { public void testSearchLastUpdatedParamWithComparator() throws InterruptedException {
String methodName = "testSearchLastUpdatedParamWithComparator";
IIdType id0; IIdType id0;
{ {
Patient patient = new Patient(); Patient patient = new Patient();
@ -829,18 +830,16 @@ public class FhirResourceDaoDstu2SearchNoFtTest extends BaseJpaDstu2Test {
id0 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); id0 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
int sleep = 100;
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Thread.sleep(sleep); TestUtil.sleepOneClick();
DateTimeDt beforeAny = new DateTimeDt(new Date(), TemporalPrecisionEnum.MILLI);
IIdType id1a; IIdType id1a;
{ {
Patient patient = new Patient(); Patient patient = new Patient();
patient.addIdentifier().setSystem("urn:system").setValue("001"); patient.addIdentifier().setSystem("urn:system").setValue("001");
id1a = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); id1a = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
TestUtil.sleepOneClick();
IIdType id1b; IIdType id1b;
{ {
Patient patient = new Patient(); Patient patient = new Patient();
@ -853,7 +852,7 @@ public class FhirResourceDaoDstu2SearchNoFtTest extends BaseJpaDstu2Test {
InstantDt id1bpublished = ResourceMetadataKeyEnum.PUBLISHED.get(myPatientDao.read(id1b, mySrd)); InstantDt id1bpublished = ResourceMetadataKeyEnum.PUBLISHED.get(myPatientDao.read(id1b, mySrd));
ourLog.info("Res 3: {}", id1bpublished.getValueAsString()); ourLog.info("Res 3: {}", id1bpublished.getValueAsString());
Thread.sleep(sleep); TestUtil.sleepOneClick();
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
SearchParameterMap params; SearchParameterMap params;

View File

@ -1023,7 +1023,7 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test {
patient.addName().setFamily("testSearchLanguageParam").addGiven("Joe"); patient.addName().setFamily("testSearchLanguageParam").addGiven("Joe");
id1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); id1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
TestUtil.sleepOneClick();
Date betweenTime = new Date(); Date betweenTime = new Date();
IIdType id2; IIdType id2;
@ -1199,10 +1199,9 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test {
id0 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); id0 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
int sleep = 100; TestUtil.sleepOneClick();
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
TestUtil.sleepAtLeast(sleep); TestUtil.sleepOneClick();
IIdType id1a; IIdType id1a;
{ {
@ -1221,7 +1220,7 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test {
ourLog.info("Res 2: {}", myPatientDao.read(id1a, mySrd).getMeta().getLastUpdatedElement().getValueAsString()); ourLog.info("Res 2: {}", myPatientDao.read(id1a, mySrd).getMeta().getLastUpdatedElement().getValueAsString());
ourLog.info("Res 3: {}", myPatientDao.read(id1b, mySrd).getMeta().getLastUpdatedElement().getValueAsString()); ourLog.info("Res 3: {}", myPatientDao.read(id1b, mySrd).getMeta().getLastUpdatedElement().getValueAsString());
TestUtil.sleepAtLeast(sleep); TestUtil.sleepOneClick();
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
SearchParameterMap map; SearchParameterMap map;
@ -1856,15 +1855,15 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test {
patient.addName().setFamily("Tester_testSearchStringParam").addGiven("Joe"); patient.addName().setFamily("Tester_testSearchStringParam").addGiven("Joe");
pid1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); pid1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
TestUtil.sleepOneClick();
Date between = new Date(); Date between = new Date();
Thread.sleep(10);
{ {
Patient patient = new Patient(); Patient patient = new Patient();
patient.addIdentifier().setSystem("urn:system").setValue("002"); patient.addIdentifier().setSystem("urn:system").setValue("002");
patient.addName().setFamily("Tester_testSearchStringParam").addGiven("John"); patient.addName().setFamily("Tester_testSearchStringParam").addGiven("John");
pid2 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); pid2 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
Thread.sleep(10); TestUtil.sleepOneClick();
Date after = new Date(); Date after = new Date();
SearchParameterMap params; SearchParameterMap params;
@ -2874,6 +2873,8 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test {
tag1id = myOrganizationDao.create(org, mySrd).getId().toUnqualifiedVersionless(); tag1id = myOrganizationDao.create(org, mySrd).getId().toUnqualifiedVersionless();
} }
TestUtil.sleepOneClick();
Date betweenDate = new Date(); Date betweenDate = new Date();
IIdType tag2id; IIdType tag2id;
@ -3196,7 +3197,7 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test {
p01.addName().setFamily("B").addGiven("A"); p01.addName().setFamily("B").addGiven("A");
String id1 = myPatientDao.create(p01).getId().toUnqualifiedVersionless().getValue(); String id1 = myPatientDao.create(p01).getId().toUnqualifiedVersionless().getValue();
Thread.sleep(10); TestUtil.sleepOneClick();
// Numeric ID // Numeric ID
Patient p02 = new Patient(); Patient p02 = new Patient();
@ -3206,7 +3207,7 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test {
p02.addName().setFamily("Z").addGiven("Z"); p02.addName().setFamily("Z").addGiven("Z");
String id2 = myPatientDao.create(p02).getId().toUnqualifiedVersionless().getValue(); String id2 = myPatientDao.create(p02).getId().toUnqualifiedVersionless().getValue();
Thread.sleep(10); TestUtil.sleepOneClick();
// Forced ID // Forced ID
Patient pAB = new Patient(); Patient pAB = new Patient();
@ -3216,7 +3217,7 @@ public class FhirResourceDaoDstu3SearchNoFtTest extends BaseJpaDstu3Test {
pAB.addName().setFamily("A").addGiven("B"); pAB.addName().setFamily("A").addGiven("B");
myPatientDao.update(pAB); myPatientDao.update(pAB);
Thread.sleep(10); TestUtil.sleepOneClick();
// Forced ID // Forced ID
Patient pAA = new Patient(); Patient pAA = new Patient();

View File

@ -889,10 +889,11 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test {
patient.addIdentifier().setSystem("urn:system").setValue("001"); patient.addIdentifier().setSystem("urn:system").setValue("001");
id1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); id1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
long betweenTime = System.currentTimeMillis();
TestUtil.sleepOneClick(); TestUtil.sleepOneClick();
long betweenTime = System.currentTimeMillis();
IIdType id2; IIdType id2;
{ {
Patient patient = new Patient(); Patient patient = new Patient();
@ -1267,6 +1268,8 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test {
id1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); id1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
TestUtil.sleepOneClick();
Date betweenTime = new Date(); Date betweenTime = new Date();
TestUtil.sleepOneClick(); TestUtil.sleepOneClick();
@ -1444,6 +1447,7 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test {
id0 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); id0 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
TestUtil.sleepOneClick();
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
@ -1455,6 +1459,9 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test {
patient.addIdentifier().setSystem("urn:system").setValue("001"); patient.addIdentifier().setSystem("urn:system").setValue("001");
id1a = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); id1a = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
TestUtil.sleepOneClick();
IIdType id1b; IIdType id1b;
{ {
Patient patient = new Patient(); Patient patient = new Patient();
@ -1863,15 +1870,15 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test {
obs01.setSubject(new Reference(patientId01)); obs01.setSubject(new Reference(patientId01));
IIdType obsId01 = myObservationDao.create(obs01, mySrd).getId().toUnqualifiedVersionless(); IIdType obsId01 = myObservationDao.create(obs01, mySrd).getId().toUnqualifiedVersionless();
TestUtil.sleepOneClick();
Date between = new Date(); Date between = new Date();
Thread.sleep(10);
Observation obs02 = new Observation(); Observation obs02 = new Observation();
obs02.setEffective(new DateTimeType(new Date())); obs02.setEffective(new DateTimeType(new Date()));
obs02.setSubject(new Reference(locId01)); obs02.setSubject(new Reference(locId01));
IIdType obsId02 = myObservationDao.create(obs02, mySrd).getId().toUnqualifiedVersionless(); IIdType obsId02 = myObservationDao.create(obs02, mySrd).getId().toUnqualifiedVersionless();
Thread.sleep(10); TestUtil.sleepOneClick();
Date after = new Date(); Date after = new Date();
ourLog.info("P1[{}] L1[{}] Obs1[{}] Obs2[{}]", patientId01, locId01, obsId01, obsId02); ourLog.info("P1[{}] L1[{}] Obs1[{}] Obs2[{}]", patientId01, locId01, obsId01, obsId02);
@ -1994,15 +2001,16 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test {
patient.addName().setFamily("Tester_testSearchStringParam").addGiven("Joe"); patient.addName().setFamily("Tester_testSearchStringParam").addGiven("Joe");
pid1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); pid1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
TestUtil.sleepOneClick();
Date between = new Date(); Date between = new Date();
Thread.sleep(10);
{ {
Patient patient = new Patient(); Patient patient = new Patient();
patient.addIdentifier().setSystem("urn:system").setValue("002"); patient.addIdentifier().setSystem("urn:system").setValue("002");
patient.addName().setFamily("Tester_testSearchStringParam").addGiven("John"); patient.addName().setFamily("Tester_testSearchStringParam").addGiven("John");
pid2 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); pid2 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
Thread.sleep(10); TestUtil.sleepOneClick();
Date after = new Date(); Date after = new Date();
SearchParameterMap params; SearchParameterMap params;
@ -2972,6 +2980,8 @@ public class FhirResourceDaoR4SearchNoFtTest extends BaseJpaR4Test {
tag1id = myOrganizationDao.create(org, mySrd).getId().toUnqualifiedVersionless(); tag1id = myOrganizationDao.create(org, mySrd).getId().toUnqualifiedVersionless();
} }
TestUtil.sleepOneClick();
Date betweenDate = new Date(); Date betweenDate = new Date();
IIdType tag2id; IIdType tag2id;

View File

@ -1443,7 +1443,10 @@ public class FhirResourceDaoR4SearchNoHashesTest extends BaseJpaR4Test {
id0 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); id0 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
TestUtil.sleepOneClick();
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
TestUtil.sleepOneClick(); TestUtil.sleepOneClick();
IIdType id1a; IIdType id1a;
@ -1452,6 +1455,9 @@ public class FhirResourceDaoR4SearchNoHashesTest extends BaseJpaR4Test {
patient.addIdentifier().setSystem("urn:system").setValue("001"); patient.addIdentifier().setSystem("urn:system").setValue("001");
id1a = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); id1a = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
TestUtil.sleepOneClick();
IIdType id1b; IIdType id1b;
{ {
Patient patient = new Patient(); Patient patient = new Patient();
@ -1992,6 +1998,7 @@ public class FhirResourceDaoR4SearchNoHashesTest extends BaseJpaR4Test {
patient.addName().setFamily("Tester_testSearchStringParam").addGiven("Joe"); patient.addName().setFamily("Tester_testSearchStringParam").addGiven("Joe");
pid1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); pid1 = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
} }
TestUtil.sleepOneClick();
Date between = new Date(); Date between = new Date();
TestUtil.sleepOneClick(); TestUtil.sleepOneClick();
{ {

View File

@ -10,6 +10,7 @@ import ca.uhn.fhir.jpa.model.entity.*;
import ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl; import ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl;
import ca.uhn.fhir.jpa.searchparam.SearchParamConstants; import ca.uhn.fhir.jpa.searchparam.SearchParamConstants;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.TestUtil;
import ca.uhn.fhir.model.api.Include; import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum; import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
import ca.uhn.fhir.model.valueset.BundleEntrySearchModeEnum; import ca.uhn.fhir.model.valueset.BundleEntrySearchModeEnum;
@ -20,7 +21,6 @@ import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.*; import ca.uhn.fhir.rest.param.*;
import ca.uhn.fhir.rest.server.exceptions.*; import ca.uhn.fhir.rest.server.exceptions.*;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails; import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails;
import ca.uhn.fhir.util.TestUtil;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -3159,16 +3159,22 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
p.addName().setFamily(methodName); p.addName().setFamily(methodName);
IIdType id1 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless(); IIdType id1 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
TestUtil.sleepOneClick();
p = new Patient(); p = new Patient();
p.addIdentifier().setSystem("urn:system2").setValue(methodName); p.addIdentifier().setSystem("urn:system2").setValue(methodName);
p.addName().setFamily(methodName); p.addName().setFamily(methodName);
IIdType id2 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless(); IIdType id2 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
TestUtil.sleepOneClick();
p = new Patient(); p = new Patient();
p.addIdentifier().setSystem("urn:system3").setValue(methodName); p.addIdentifier().setSystem("urn:system3").setValue(methodName);
p.addName().setFamily(methodName); p.addName().setFamily(methodName);
IIdType id3 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless(); IIdType id3 = myPatientDao.create(p, mySrd).getId().toUnqualifiedVersionless();
TestUtil.sleepOneClick();
p = new Patient(); p = new Patient();
p.addIdentifier().setSystem("urn:system4").setValue(methodName); p.addIdentifier().setSystem("urn:system4").setValue(methodName);
p.addName().setFamily(methodName); p.addName().setFamily(methodName);

View File

@ -313,10 +313,10 @@ public class FhirResourceDaoR4UpdateTest extends BaseJpaR4Test {
assertEquals("1", outcome.getId().getVersionIdPart()); assertEquals("1", outcome.getId().getVersionIdPart());
Date now = new Date();
TestUtil.sleepOneClick(); TestUtil.sleepOneClick();
Date now = new Date();
Patient retrieved = myPatientDao.read(outcome.getId(), mySrd); Patient retrieved = myPatientDao.read(outcome.getId(), mySrd);
InstantType updated = TestUtil.getTimestamp(retrieved); InstantType updated = TestUtil.getTimestamp(retrieved);
assertTrue(updated.before(now)); assertTrue(updated.before(now));

View File

@ -159,14 +159,19 @@ public class ResourceProviderR4CacheTest extends BaseResourceProviderR4Test {
ourClient.create().resource(pt1).execute(); ourClient.create().resource(pt1).execute();
Date beforeFirst = new Date(); Date beforeFirst = new Date();
TestUtil.sleepOneClick(); TestUtil.sleepOneClick();
Bundle results1 = ourClient.search().forResource("Patient").where(Patient.FAMILY.matches().value("FAM")).returnBundle(Bundle.class).execute(); Bundle results1 = ourClient.search().forResource("Patient").where(Patient.FAMILY.matches().value("FAM")).returnBundle(Bundle.class).execute();
TestUtil.sleepOneClick();
assertEquals(1, results1.getEntry().size()); assertEquals(1, results1.getEntry().size());
assertEquals(1, mySearchEntityDao.count()); assertEquals(1, mySearchEntityDao.count());
assertThat(myCapturingInterceptor.getLastResponse().getHeaders(Constants.HEADER_X_CACHE), empty()); assertThat(myCapturingInterceptor.getLastResponse().getHeaders(Constants.HEADER_X_CACHE), empty());
assertThat(TestUtil.getTimestamp(results1).getValue(), greaterThan(beforeFirst)); Date results1Date = TestUtil.getTimestamp(results1).getValue();
assertThat(TestUtil.getTimestamp(results1).getValue(), lessThan(new Date())); assertThat(results1Date, greaterThan(beforeFirst));
assertThat(results1Date, lessThan(new Date()));
assertThat(results1.getId(), not(blankOrNullString())); assertThat(results1.getId(), not(blankOrNullString()));
Patient pt2 = new Patient(); Patient pt2 = new Patient();

View File

@ -1,72 +0,0 @@
package ca.uhn.fhir.jpa.subscription.module;
import ca.uhn.fhir.jpa.model.interceptor.api.HookParams;
import ca.uhn.fhir.jpa.model.interceptor.api.IAnonymousLambdaHook;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.junit.Assert.assertTrue;
public class LatchedService implements IAnonymousLambdaHook {
private static final Logger ourLog = LoggerFactory.getLogger(LatchedService.class);
private static final int DEFAULT_TIMEOUT_SECONDS = 20;
private final String name;
private CountDownLatch myCountdownLatch;
private AtomicReference<String> myFailure;
private AtomicReference<List<HookParams>> myCalledWith;
public LatchedService(Pointcut thePointcut) {
this.name = thePointcut.name();
}
public LatchedService(String theName) {
this.name = theName;
}
public void countdown() {
if (myCountdownLatch == null) {
myFailure.set(name + " latch countdown() called before expectedCount set.");
} else if (myCountdownLatch.getCount() <= 0) {
myFailure.set(name + " latch countdown() called "+ (1 - myCountdownLatch.getCount()) + " more times than expected.");
}
ourLog.info("{} counting down {}", name, myCountdownLatch);
myCountdownLatch.countDown();
}
public void setExpectedCount(int count) {
myFailure = new AtomicReference<>();
myCalledWith = new AtomicReference<>(new ArrayList<>());
myCountdownLatch = new CountDownLatch(count);
}
public void awaitExpected() throws InterruptedException {
awaitExpectedWithTimeout(DEFAULT_TIMEOUT_SECONDS);
}
public void awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException {
assertTrue(name +" latch timed out waiting "+timeoutSecond+" seconds for latch to be triggered.", myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS));
if (myFailure.get() != null) {
String error = myFailure.get();
error += "\nLatch called with values: "+myCalledWith.get().stream().map(Object::toString).collect(Collectors.joining(", "));
throw new AssertionError(error);
}
}
@Override
public void invoke(HookParams theArgs) {
this.countdown();
if (myCalledWith.get() != null) {
myCalledWith.get().add(theArgs);
}
}
}

View File

@ -0,0 +1,95 @@
package ca.uhn.fhir.jpa.subscription.module;
import ca.uhn.fhir.jpa.model.interceptor.api.HookParams;
import ca.uhn.fhir.jpa.model.interceptor.api.IAnonymousLambdaHook;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class PointcutLatch implements IAnonymousLambdaHook {
private static final Logger ourLog = LoggerFactory.getLogger(PointcutLatch.class);
private static final int DEFAULT_TIMEOUT_SECONDS = 10;
private final String name;
private Semaphore mySemaphore = new Semaphore(1);
private CountDownLatch myCountdownLatch;
private AtomicReference<String> myFailure;
private AtomicReference<List<HookParams>> myCalledWith;
public PointcutLatch(Pointcut thePointcut) {
this.name = thePointcut.name();
}
public PointcutLatch(String theName) {
this.name = theName;
}
private void countdown() {
if (myCountdownLatch == null) {
myFailure.set(name + " latch countdown() called before expectedCount set.");
} else if (myCountdownLatch.getCount() <= 0) {
myFailure.set(name + " latch countdown() called "+ (1 - myCountdownLatch.getCount()) + " more times than expected.");
}
ourLog.info("{} counting down {}", name, myCountdownLatch);
myCountdownLatch.countDown();
}
public void setExpectedCount(int count) throws InterruptedException {
mySemaphore.acquire();
if (myCountdownLatch != null) {
myFailure.set(name + " latch setExpectedCount() called before previous awaitExpected() completed.");
}
myFailure = new AtomicReference<>();
myCalledWith = new AtomicReference<>(new ArrayList<>());
myCountdownLatch = new CountDownLatch(count);
}
public void awaitExpected() throws InterruptedException {
awaitExpected(true);
}
public void awaitExpected(boolean release) throws InterruptedException {
awaitExpectedWithTimeout(DEFAULT_TIMEOUT_SECONDS, release);
}
public void awaitExpectedWithTimeout(int timeoutSecond, boolean release) throws InterruptedException {
try {
assertNotNull(name + " latch awaitExpected() called before previous setExpected() called.", myCountdownLatch);
assertTrue(name + " latch timed out waiting " + timeoutSecond + " seconds for latch to be triggered.", myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS));
if (myFailure.get() != null) {
String error = myFailure.get();
error += "\nLatch called with values: " + myCalledWith.get().stream().map(Object::toString).collect(Collectors.joining(", "));
throw new AssertionError(error);
}
} finally {
if (release) {
release();
}
}
}
public void release() {
myCountdownLatch = null;
mySemaphore.release();
}
@Override
public void invoke(HookParams theArgs) {
this.countdown();
if (myCalledWith.get() != null) {
myCalledWith.get().add(theArgs);
}
}
}

View File

@ -1,15 +1,14 @@
package ca.uhn.fhir.jpa.subscription.module.standalone; package ca.uhn.fhir.jpa.subscription.module.standalone;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.interceptor.api.HookParams;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.model.interceptor.executor.InterceptorRegistry; import ca.uhn.fhir.jpa.model.interceptor.executor.InterceptorRegistry;
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test; import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import ca.uhn.fhir.jpa.subscription.module.LatchedService; import ca.uhn.fhir.jpa.subscription.module.PointcutLatch;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest;
import ca.uhn.fhir.rest.annotation.Create; import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam; import ca.uhn.fhir.rest.annotation.ResourceParam;
@ -65,8 +64,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
private static SubscribableChannel ourSubscribableChannel; private static SubscribableChannel ourSubscribableChannel;
private List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>()); private List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>());
private long idCounter = 0; private long idCounter = 0;
protected LatchedService mySubscriptionMatchingPost = new LatchedService(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED); protected PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED);
protected LatchedService mySubscriptionActivatedPost = new LatchedService(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); protected PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
@Before @Before
public void beforeReset() { public void beforeReset() {
@ -165,7 +164,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
public static class ObservationListener implements IResourceProvider { public static class ObservationListener implements IResourceProvider {
private LatchedService updateLatch = new LatchedService("Observation Update"); private PointcutLatch updateLatch = new PointcutLatch("Observation Update");
@Create @Create
public MethodOutcome create(@ResourceParam Observation theObservation, HttpServletRequest theRequest) { public MethodOutcome create(@ResourceParam Observation theObservation, HttpServletRequest theRequest) {
@ -184,17 +183,21 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
public MethodOutcome update(@ResourceParam Observation theObservation, HttpServletRequest theRequest) { public MethodOutcome update(@ResourceParam Observation theObservation, HttpServletRequest theRequest) {
ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", "")); ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
ourUpdatedObservations.add(theObservation); ourUpdatedObservations.add(theObservation);
updateLatch.countdown(); updateLatch.invoke(new HookParams().add(Observation.class, theObservation));
ourLog.info("Received Listener Update (now have {} updates)", ourUpdatedObservations.size()); ourLog.info("Received Listener Update (now have {} updates)", ourUpdatedObservations.size());
return new MethodOutcome(new IdType("Observation/1"), false); return new MethodOutcome(new IdType("Observation/1"), false);
} }
public void setExpectedCount(int count) { public void setExpectedCount(int count) throws InterruptedException {
updateLatch.setExpectedCount(count); updateLatch.setExpectedCount(count);
} }
public void awaitExpected() throws InterruptedException { public void awaitExpected(boolean release) throws InterruptedException {
updateLatch.awaitExpected(); updateLatch.awaitExpected(release);
}
public void release() {
updateLatch.release();
} }
} }
} }

View File

@ -27,9 +27,10 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
ourObservationListener.setExpectedCount(1); ourObservationListener.setExpectedCount(1);
sendObservation(code, "SNOMED-CT"); sendObservation(code, "SNOMED-CT");
ourObservationListener.awaitExpected(); ourObservationListener.awaitExpected(false);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
ourObservationListener.release();
} }
@Test @Test
@ -45,9 +46,10 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
ourObservationListener.setExpectedCount(1); ourObservationListener.setExpectedCount(1);
sendObservation(code, "SNOMED-CT"); sendObservation(code, "SNOMED-CT");
ourObservationListener.awaitExpected(); ourObservationListener.awaitExpected(false);
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0)); assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
ourObservationListener.release();
} }
@Test @Test
@ -65,6 +67,6 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
mySubscriptionMatchingPost.setExpectedCount(1); mySubscriptionMatchingPost.setExpectedCount(1);
sendObservation(code, "SNOMED-CT"); sendObservation(code, "SNOMED-CT");
mySubscriptionMatchingPost.awaitExpected(); mySubscriptionMatchingPost.awaitExpected();
ourObservationListener.awaitExpected(); ourObservationListener.awaitExpected(true);
} }
} }

View File

@ -27,9 +27,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
ourObservationListener.setExpectedCount(1); ourObservationListener.setExpectedCount(1);
sendObservation(code, "SNOMED-CT"); sendObservation(code, "SNOMED-CT");
ourObservationListener.awaitExpected(); ourObservationListener.awaitExpected(false);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
ourObservationListener.release();
} }
@Test @Test
@ -45,9 +46,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
ourObservationListener.setExpectedCount(1); ourObservationListener.setExpectedCount(1);
sendObservation(code, "SNOMED-CT"); sendObservation(code, "SNOMED-CT");
ourObservationListener.awaitExpected(); ourObservationListener.awaitExpected(false);
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0)); assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
ourObservationListener.release();
} }
@Test @Test
@ -65,6 +67,6 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
mySubscriptionMatchingPost.setExpectedCount(1); mySubscriptionMatchingPost.setExpectedCount(1);
sendObservation(code, "SNOMED-CT"); sendObservation(code, "SNOMED-CT");
mySubscriptionMatchingPost.awaitExpected(); mySubscriptionMatchingPost.awaitExpected();
ourObservationListener.awaitExpected(); ourObservationListener.awaitExpected(true);
} }
} }