Fix subscription issue
This commit is contained in:
parent
588016e406
commit
8c0b665565
|
@ -1233,7 +1233,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao {
|
||||||
boolean paramsUriPopulated = theEntity.isParamsUriPopulated();
|
boolean paramsUriPopulated = theEntity.isParamsUriPopulated();
|
||||||
boolean hasLinks = theEntity.isHasLinks();
|
boolean hasLinks = theEntity.isHasLinks();
|
||||||
|
|
||||||
Collection<ResourceIndexedSearchParamString> paramsString = new ArrayList<ResourceIndexedSearchParamString>(theEntity.getParamsString());
|
Collection<ResourceIndexedSearchParamString> paramsString = new ArrayList<ResourceIndexedSearchParamString>(theEntity.getParamsString()); // TODO: use the isPopulated method to decide whether to call this
|
||||||
Collection<ResourceIndexedSearchParamToken> paramsToken = new ArrayList<ResourceIndexedSearchParamToken>(theEntity.getParamsToken());
|
Collection<ResourceIndexedSearchParamToken> paramsToken = new ArrayList<ResourceIndexedSearchParamToken>(theEntity.getParamsToken());
|
||||||
Collection<ResourceIndexedSearchParamNumber> paramsNumber = new ArrayList<ResourceIndexedSearchParamNumber>(theEntity.getParamsNumber());
|
Collection<ResourceIndexedSearchParamNumber> paramsNumber = new ArrayList<ResourceIndexedSearchParamNumber>(theEntity.getParamsNumber());
|
||||||
Collection<ResourceIndexedSearchParamQuantity> paramsQuantity = new ArrayList<ResourceIndexedSearchParamQuantity>(theEntity.getParamsQuantity());
|
Collection<ResourceIndexedSearchParamQuantity> paramsQuantity = new ArrayList<ResourceIndexedSearchParamQuantity>(theEntity.getParamsQuantity());
|
||||||
|
|
|
@ -116,6 +116,7 @@ public class FhirResourceDaoSubscriptionDstu2 extends FhirResourceDaoDstu2<Subsc
|
||||||
retVal += txTemplate.execute(new TransactionCallback<Integer>() {
|
retVal += txTemplate.execute(new TransactionCallback<Integer>() {
|
||||||
@Override
|
@Override
|
||||||
public Integer doInTransaction(TransactionStatus theStatus) {
|
public Integer doInTransaction(TransactionStatus theStatus) {
|
||||||
|
SubscriptionTable nextSubscriptionTable = mySubscriptionTableDao.findOne(nextSubscriptionTablePid);
|
||||||
return pollForNewUndeliveredResources(nextSubscriptionTable);
|
return pollForNewUndeliveredResources(nextSubscriptionTable);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -156,7 +157,7 @@ public class FhirResourceDaoSubscriptionDstu2 extends FhirResourceDaoDstu2<Subsc
|
||||||
Date mostRecentMatch = null;
|
Date mostRecentMatch = null;
|
||||||
for (IBaseResource next : results.getResources(0, results.size())) {
|
for (IBaseResource next : results.getResources(0, results.size())) {
|
||||||
|
|
||||||
Date updated = ResourceMetadataKeyEnum.PUBLISHED.get((IResource) next).getValue();
|
Date updated = ResourceMetadataKeyEnum.UPDATED.get((IResource) next).getValue();
|
||||||
if (mostRecentMatch == null) {
|
if (mostRecentMatch == null) {
|
||||||
mostRecentMatch = updated;
|
mostRecentMatch = updated;
|
||||||
} else {
|
} else {
|
||||||
|
@ -170,6 +171,8 @@ public class FhirResourceDaoSubscriptionDstu2 extends FhirResourceDaoDstu2<Subsc
|
||||||
SubscriptionFlaggedResource nextFlag = new SubscriptionFlaggedResource();
|
SubscriptionFlaggedResource nextFlag = new SubscriptionFlaggedResource();
|
||||||
Long pid = IDao.RESOURCE_PID.get((IResource) next);
|
Long pid = IDao.RESOURCE_PID.get((IResource) next);
|
||||||
|
|
||||||
|
ourLog.info("New resource for subscription: {}", pid);
|
||||||
|
|
||||||
nextFlag.setResource(myEntityManager.find(ResourceTable.class, pid));
|
nextFlag.setResource(myEntityManager.find(ResourceTable.class, pid));
|
||||||
nextFlag.setSubscription(theSubscriptionTable);
|
nextFlag.setSubscription(theSubscriptionTable);
|
||||||
nextFlag.setVersion(next.getIdElement().getVersionIdPartAsLong());
|
nextFlag.setVersion(next.getIdElement().getVersionIdPartAsLong());
|
||||||
|
@ -181,7 +184,7 @@ public class FhirResourceDaoSubscriptionDstu2 extends FhirResourceDaoDstu2<Subsc
|
||||||
ourLog.debug("Updating most recent match for subcription {} to {}", subscription.getId().getIdPart(), new InstantDt(mostRecentMatch));
|
ourLog.debug("Updating most recent match for subcription {} to {}", subscription.getId().getIdPart(), new InstantDt(mostRecentMatch));
|
||||||
|
|
||||||
theSubscriptionTable.setMostRecentMatch(mostRecentMatch);
|
theSubscriptionTable.setMostRecentMatch(mostRecentMatch);
|
||||||
myEntityManager.merge(theSubscriptionTable);
|
mySubscriptionTableDao.save(theSubscriptionTable);
|
||||||
|
|
||||||
return results.size();
|
return results.size();
|
||||||
}
|
}
|
||||||
|
|
|
@ -387,7 +387,7 @@ public class FhirResourceDaoDstu2SubscriptionTest extends BaseJpaDstu2Test {
|
||||||
Observation obs = new Observation();
|
Observation obs = new Observation();
|
||||||
obs.getSubject().setReference(pId);
|
obs.getSubject().setReference(pId);
|
||||||
obs.setStatus(ObservationStatusEnum.FINAL);
|
obs.setStatus(ObservationStatusEnum.FINAL);
|
||||||
myObservationDao.create(obs).getId().toUnqualifiedVersionless();
|
IIdType oId = myObservationDao.create(obs).getId().toUnqualifiedVersionless();
|
||||||
|
|
||||||
Subscription subs;
|
Subscription subs;
|
||||||
|
|
||||||
|
@ -404,9 +404,13 @@ public class FhirResourceDaoDstu2SubscriptionTest extends BaseJpaDstu2Test {
|
||||||
assertNull(mySubscriptionTableDao.findOne(subsId1).getLastClientPoll());
|
assertNull(mySubscriptionTableDao.findOne(subsId1).getLastClientPoll());
|
||||||
|
|
||||||
assertEquals(0, mySubscriptionDao.pollForNewUndeliveredResources());
|
assertEquals(0, mySubscriptionDao.pollForNewUndeliveredResources());
|
||||||
mySystemDao.markAllResourcesForReindexing();
|
|
||||||
mySystemDao.performReindexingPass(100);
|
ourLog.info("pId: {} - oId: {}", pId, oId);
|
||||||
|
|
||||||
|
myObservationDao.update(myObservationDao.read(oId));
|
||||||
|
|
||||||
assertEquals(1, mySubscriptionDao.pollForNewUndeliveredResources());
|
assertEquals(1, mySubscriptionDao.pollForNewUndeliveredResources());
|
||||||
|
ourLog.info("Between passes");
|
||||||
assertEquals(0, mySubscriptionDao.pollForNewUndeliveredResources());
|
assertEquals(0, mySubscriptionDao.pollForNewUndeliveredResources());
|
||||||
|
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
@ -415,32 +419,29 @@ public class FhirResourceDaoDstu2SubscriptionTest extends BaseJpaDstu2Test {
|
||||||
obs = new Observation();
|
obs = new Observation();
|
||||||
obs.getSubject().setReference(pId);
|
obs.getSubject().setReference(pId);
|
||||||
obs.setStatus(ObservationStatusEnum.FINAL);
|
obs.setStatus(ObservationStatusEnum.FINAL);
|
||||||
IIdType afterId1 = myObservationDao.create(obs).getId().toUnqualifiedVersionless();
|
myObservationDao.create(obs).getId().toUnqualifiedVersionless();
|
||||||
|
|
||||||
obs = new Observation();
|
obs = new Observation();
|
||||||
obs.getSubject().setReference(pId);
|
obs.getSubject().setReference(pId);
|
||||||
obs.setStatus(ObservationStatusEnum.FINAL);
|
obs.setStatus(ObservationStatusEnum.FINAL);
|
||||||
IIdType afterId2 = myObservationDao.create(obs).getId().toUnqualifiedVersionless();
|
myObservationDao.create(obs).getId().toUnqualifiedVersionless();
|
||||||
|
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
ourLog.info("After: {}", System.currentTimeMillis());
|
ourLog.info("After: {}", System.currentTimeMillis());
|
||||||
|
|
||||||
List<IBaseResource> results;
|
assertEquals(2, mySubscriptionDao.pollForNewUndeliveredResources());
|
||||||
List<IIdType> resultIds;
|
assertEquals(3, mySubscriptionFlaggedResourceDataDao.count());
|
||||||
|
|
||||||
mySubscriptionDao.pollForNewUndeliveredResources();
|
|
||||||
assertEquals(2, mySubscriptionFlaggedResourceDataDao.count());
|
|
||||||
|
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
mySubscriptionDao.pollForNewUndeliveredResources();
|
mySubscriptionDao.pollForNewUndeliveredResources();
|
||||||
assertEquals(2, mySubscriptionFlaggedResourceDataDao.count());
|
assertEquals(3, mySubscriptionFlaggedResourceDataDao.count());
|
||||||
|
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
mySubscriptionDao.pollForNewUndeliveredResources();
|
mySubscriptionDao.pollForNewUndeliveredResources();
|
||||||
assertEquals(2, mySubscriptionFlaggedResourceDataDao.count());
|
assertEquals(3, mySubscriptionFlaggedResourceDataDao.count());
|
||||||
|
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
|
@ -450,12 +451,12 @@ public class FhirResourceDaoDstu2SubscriptionTest extends BaseJpaDstu2Test {
|
||||||
myObservationDao.create(obs).getId().toUnqualifiedVersionless();
|
myObservationDao.create(obs).getId().toUnqualifiedVersionless();
|
||||||
|
|
||||||
mySubscriptionDao.pollForNewUndeliveredResources();
|
mySubscriptionDao.pollForNewUndeliveredResources();
|
||||||
assertEquals(3, mySubscriptionFlaggedResourceDataDao.count());
|
assertEquals(4, mySubscriptionFlaggedResourceDataDao.count());
|
||||||
|
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
mySubscriptionDao.pollForNewUndeliveredResources();
|
mySubscriptionDao.pollForNewUndeliveredResources();
|
||||||
assertEquals(3, mySubscriptionFlaggedResourceDataDao.count());
|
assertEquals(4, mySubscriptionFlaggedResourceDataDao.count());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
/target
|
/target
|
||||||
/jpaserver_derby_files
|
/jpaserver_derby_files
|
||||||
*.log
|
*.log
|
||||||
|
ca.uhn.fhir.jpa.entity.ResourceTable/
|
||||||
|
|
||||||
# Created by https://www.gitignore.io
|
# Created by https://www.gitignore.io
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue