Correct subscription delivery metadata

This commit is contained in:
James Agnew 2019-01-04 10:56:21 -05:00
parent 7ba07d9f02
commit a1275874f8
13 changed files with 295 additions and 135 deletions

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.util;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -20,41 +20,74 @@ package ca.uhn.fhir.util;
* #L%
*/
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.LinkedHashSet;
import java.net.Socket;
/**
* Provides server ports
*/
@CoverageIgnore
public class PortUtil {
private static LinkedHashSet<Integer> ourPorts = new LinkedHashSet<>();
private static final Logger ourLog = LoggerFactory.getLogger(PortUtil.class);
/*
* Non instantiable
*/
private PortUtil() {
// nothing
}
}
/**
* This is really only used for unit tests but is included in the library so it can be reused across modules. Use with caution.
* The entire purpose here is to find an available port that can then be
* bound for by server in a unit test without conflicting with other tests.
* <p>
* This is really only used for unit tests but is included in the library
* so it can be reused across modules. Use with caution.
*/
public static int findFreePort() {
ServerSocket server;
try {
int port;
do {
server = new ServerSocket(0);
server.setReuseAddress(true);
port = server.getLocalPort();
server.close();
} while (!ourPorts.add(port));
server = new ServerSocket(0);
server.setReuseAddress(true);
int port = server.getLocalPort();
server.close();
/*
* This is an attempt to make sure the port is actually
* free before releasing it. For whatever reason on Linux
* it seems like even after we close the ServerSocket there
* is a short while where it is not possible to bind the
* port, even though it should be released by then.
*
* I don't have any solid evidence that this is a good
* way to do this, but it seems to help...
*/
for (int i = 0; i < 10; i++) {
try {
Socket client = new Socket();
client.connect(new InetSocketAddress(port), 1000);
ourLog.info("Socket still seems open");
Thread.sleep(250);
} catch (Exception e) {
break;
}
}
// ....annnd sleep a bit for the same reason.
Thread.sleep(500);
// Log who asked for the port, just in case that's useful
StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
StackTraceElement previousElement = stackTraceElements[2];
ourLog.info("Returned available port {} for: {}", port, previousElement.toString());
return port;
} catch (Exception e) {
//FIXME resource leak
} catch (IOException | InterruptedException e) {
throw new Error(e);
}
}

View File

@ -58,6 +58,7 @@ import org.hibernate.Session;
import org.hibernate.internal.SessionImpl;
import org.hl7.fhir.instance.model.api.*;
import org.hl7.fhir.r4.model.Bundle.HTTPVerb;
import org.hl7.fhir.r4.model.InstantType;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
@ -68,6 +69,7 @@ import org.springframework.data.domain.Slice;
import org.springframework.data.domain.SliceImpl;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
import javax.persistence.*;
@ -92,9 +94,9 @@ import static org.apache.commons.lang3.StringUtils.*;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -320,43 +322,45 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
private void doExpungeEverything() {
ourLog.info("** BEGINNING GLOBAL $expunge **");
final AtomicInteger counter = new AtomicInteger();
ourLog.info("BEGINNING GLOBAL $expunge");
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(t -> {
doExpungeEverythingQuery("UPDATE " + ResourceHistoryTable.class.getSimpleName() + " d SET d.myForcedId = null");
doExpungeEverythingQuery("UPDATE " + ResourceTable.class.getSimpleName() + " d SET d.myForcedId = null");
doExpungeEverythingQuery("UPDATE " + TermCodeSystem.class.getSimpleName() + " d SET d.myCurrentVersion = null");
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + ResourceHistoryTable.class.getSimpleName() + " d SET d.myForcedId = null"));
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + ResourceTable.class.getSimpleName() + " d SET d.myForcedId = null"));
counter.addAndGet(doExpungeEverythingQuery("UPDATE " + TermCodeSystem.class.getSimpleName() + " d SET d.myCurrentVersion = null"));
return null;
});
txTemplate.execute(t -> {
doExpungeEverythingQuery("DELETE from " + SearchParamPresent.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ForcedId.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamDate.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamNumber.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamQuantity.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamString.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamToken.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamUri.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamCoords.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ResourceIndexedCompositeStringUnique.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ResourceLink.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + SearchResult.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + SearchInclude.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + TermConceptParentChildLink.class.getSimpleName() + " d");
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchParamPresent.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ForcedId.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamDate.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamNumber.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamQuantity.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamString.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamToken.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamUri.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedSearchParamCoords.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceIndexedCompositeStringUnique.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceLink.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchResult.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SearchInclude.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptParentChildLink.class.getSimpleName() + " d"));
return null;
});
txTemplate.execute(t -> {
doExpungeEverythingQuery("DELETE from " + TermConceptMapGroupElementTarget.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + TermConceptMapGroupElement.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + TermConceptMapGroup.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + TermConceptMap.class.getSimpleName() + " d");
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroupElementTarget.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroupElement.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMapGroup.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptMap.class.getSimpleName() + " d"));
return null;
});
txTemplate.execute(t -> {
doExpungeEverythingQuery("DELETE from " + TermConceptProperty.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + TermConceptDesignation.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + TermConcept.class.getSimpleName() + " d");
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptProperty.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConceptDesignation.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermConcept.class.getSimpleName() + " d"));
for (TermCodeSystem next : myEntityManager.createQuery("SELECT c FROM " + TermCodeSystem.class.getName() + " c", TermCodeSystem.class).getResultList()) {
next.setCurrentVersion(null);
myEntityManager.merge(next);
@ -364,28 +368,33 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
return null;
});
txTemplate.execute(t -> {
doExpungeEverythingQuery("DELETE from " + TermCodeSystemVersion.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + TermCodeSystem.class.getSimpleName() + " d");
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermCodeSystemVersion.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TermCodeSystem.class.getSimpleName() + " d"));
return null;
});
txTemplate.execute(t -> {
doExpungeEverythingQuery("DELETE from " + SubscriptionTable.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ResourceHistoryTag.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ResourceTag.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + TagDefinition.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ResourceHistoryTable.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + ResourceTable.class.getSimpleName() + " d");
doExpungeEverythingQuery("DELETE from " + org.hibernate.search.jpa.Search.class.getSimpleName() + " d");
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + SubscriptionTable.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceHistoryTag.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceTag.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + TagDefinition.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceHistoryTable.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + ResourceTable.class.getSimpleName() + " d"));
counter.addAndGet(doExpungeEverythingQuery("DELETE from " + org.hibernate.search.jpa.Search.class.getSimpleName() + " d"));
return null;
});
ourLog.info("** COMPLETED GLOBAL $expunge **");
ourLog.info("COMPLETED GLOBAL $expunge - Deleted {} rows", counter.get());
}
private void doExpungeEverythingQuery(String theQuery) {
private int doExpungeEverythingQuery(String theQuery) {
StopWatch sw = new StopWatch();
int outcome = myEntityManager.createQuery(theQuery).executeUpdate();
ourLog.info("Query affected {} rows in {}: {}", outcome, sw.toString(), theQuery);
if (outcome > 0) {
ourLog.debug("Query affected {} rows in {}: {}", outcome, sw.toString(), theQuery);
} else {
ourLog.debug("Query affected {} rows in {}: {}", outcome, sw.toString(), theQuery);
}
return outcome;
}
private void expungeCurrentVersionOfResource(Long theResourceId, AtomicInteger theRemainingCount) {
@ -744,6 +753,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
}
// TODO KHS inject a searchBuilderFactory into callers of this method and delete this method
@Override
public SearchBuilder newSearchBuilder() {
return mySearchBuilderFactory.newSearchBuilder(this);
}
@ -765,14 +775,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
}
}
private void populateResourceIdFromEntity(IBaseResourceEntity theEntity, final IBaseResource theResource) {
IIdType id = theEntity.getIdDt();
if (getContext().getVersion().getVersion().isRi()) {
id = getContext().getVersion().newIdType().setValue(id.getValue());
}
theResource.setId(id);
}
/**
* Returns true if the resource has changed (either the contents or the tags)
*/
@ -980,7 +982,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
res.getMeta().setLastUpdated(null);
res.getMeta().setVersionId(null);
populateResourceIdFromEntity(theEntity, res);
updateResourceMetadata(theEntity, res);
res.setId(res.getIdElement().withVersion(theVersion.toString()));
res.getMeta().setLastUpdated(theEntity.getUpdatedDate());
@ -1303,6 +1305,9 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
changed = populateResourceIntoEntity(theRequest, theResource, theEntity, true);
// FIXME: remove
ourLog.info("** Updated setting to: " + new InstantType(theUpdateTime).getValueAsString());
theEntity.setUpdated(theUpdateTime);
if (theResource instanceof IResource) {
theEntity.setLanguage(((IResource) theResource).getLanguage().getValue());
@ -1317,6 +1322,9 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
changed = populateResourceIntoEntity(theRequest, theResource, theEntity, false);
// FIXME: remove
ourLog.info("** Updated setting to: " + new InstantType(theUpdateTime).getValueAsString());
theEntity.setUpdated(theUpdateTime);
// theEntity.setLanguage(theResource.getLanguage().getValue());
theEntity.setIndexStatus(null);
@ -1328,7 +1336,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
if (!changed.isChanged() && !theForceUpdate && myConfig.isSuppressUpdatesWithNoChange()) {
ourLog.debug("Resource {} has not changed", theEntity.getIdDt().toUnqualified().getValue());
if (theResource != null) {
populateResourceIdFromEntity(theEntity, theResource);
updateResourceMetadata(theEntity, theResource);
}
theEntity.setUnchangedInCurrentOperation(true);
return theEntity;
@ -1408,7 +1416,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
}
if (theResource != null) {
populateResourceIdFromEntity(theEntity, theResource);
updateResourceMetadata(theEntity, theResource);
}
@ -1455,6 +1463,9 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
incrementId(theResource, savedEntity, theResourceId);
}
// Update version/lastUpdated so that interceptors see the correct version
updateResourceMetadata(savedEntity, theResource);
// Notify interceptors
if (!savedEntity.isUnchangedInCurrentOperation()) {
if (theRequestDetails != null) {
@ -1471,6 +1482,23 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
return savedEntity;
}
protected void updateResourceMetadata(IBaseResourceEntity theEntity, IBaseResource theResource) {
IIdType id = theEntity.getIdDt();
if (getContext().getVersion().getVersion().isRi()) {
id = getContext().getVersion().newIdType().setValue(id.getValue());
}
theResource.setId(id);
if (theResource instanceof IResource) {
ResourceMetadataKeyEnum.VERSION.put((IResource) theResource, id.getVersionIdPart());
ResourceMetadataKeyEnum.UPDATED.put((IResource) theResource, theEntity.getUpdated());
} else {
IBaseMetaType meta = theResource.getMeta();
meta.setVersionId(id.getVersionIdPart());
meta.setLastUpdated(theEntity.getUpdatedDate());
}
}
private void validateChildReferences(IBase theElement, String thePath) {
if (theElement == null) {
return;

View File

@ -117,12 +117,12 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
public DaoMethodOutcome create(final T theResource) {
return create(theResource, null, true, null);
return create(theResource, null, true, new Date(), null);
}
@Override
public DaoMethodOutcome create(final T theResource, RequestDetails theRequestDetails) {
return create(theResource, null, true, theRequestDetails);
return create(theResource, null, true, new Date(), theRequestDetails);
}
@Override
@ -131,7 +131,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
@Override
public DaoMethodOutcome create(T theResource, String theIfNoneExist, boolean thePerformIndexing, RequestDetails theRequestDetails) {
public DaoMethodOutcome create(T theResource, String theIfNoneExist, boolean thePerformIndexing, Date theUpdateTimestamp, RequestDetails theRequestDetails) {
if (isNotBlank(theResource.getIdElement().getIdPart())) {
if (getContext().getVersion().getVersion().isOlderThan(FhirVersionEnum.DSTU3)) {
String message = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "failedToCreateWithClientAssignedId", theResource.getIdElement().getIdPart());
@ -146,12 +146,13 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
theResource.setId(UUID.randomUUID().toString());
}
return doCreate(theResource, theIfNoneExist, thePerformIndexing, new Date(), theRequestDetails);
// FIXME: this is where one date is created
return doCreate(theResource, theIfNoneExist, thePerformIndexing, theUpdateTimestamp, theRequestDetails);
}
@Override
public DaoMethodOutcome create(final T theResource, String theIfNoneExist, RequestDetails theRequestDetails) {
return create(theResource, theIfNoneExist, true, theRequestDetails);
return create(theResource, theIfNoneExist, true, new Date(), theRequestDetails);
}
public IBaseOperationOutcome createErrorOperationOutcome(String theMessage, String theCode) {
@ -446,6 +447,10 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
incrementId(theResource, entity, theResource.getIdElement());
}
// Update the version/last updated in the resource so that interceptors get
// the correct version
updateResourceMetadata(entity, theResource);
// Notify JPA interceptors
if (!updatedEntity.isUnchangedInCurrentOperation()) {
if (theRequest != null) {
@ -1134,34 +1139,28 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
return retVal;
}
private DaoMethodOutcome toMethodOutcome(final BaseHasResource theEntity, IBaseResource theResource) {
private DaoMethodOutcome toMethodOutcome(final ResourceTable theEntity, IBaseResource theResource) {
DaoMethodOutcome outcome = new DaoMethodOutcome();
IIdType id = theEntity.getIdDt();
if (getContext().getVersion().getVersion().isRi()) {
id = getContext().getVersion().newIdType().setValue(id.getValue());
// FIXME: can theResource ever be null? why?
IIdType id = null;
if (theResource != null) {
id = theResource.getIdElement();
}
if (id == null) {
id = ((BaseHasResource) theEntity).getIdDt();
if (getContext().getVersion().getVersion().isRi()) {
id = getContext().getVersion().newIdType().setValue(id.getValue());
}
}
outcome.setId(id);
outcome.setResource(theResource);
if (theResource != null) {
theResource.setId(id);
if (theResource instanceof IResource) {
ResourceMetadataKeyEnum.UPDATED.put((IResource) theResource, theEntity.getUpdated());
} else {
IBaseMetaType meta = theResource.getMeta();
meta.setLastUpdated(theEntity.getUpdatedDate());
}
}
outcome.setEntity(theEntity);
return outcome;
}
private DaoMethodOutcome toMethodOutcome(final ResourceTable theEntity, IBaseResource theResource) {
DaoMethodOutcome retVal = toMethodOutcome((BaseHasResource) theEntity, theResource);
retVal.setEntity(theEntity);
return retVal;
}
private ArrayList<TagDefinition> toTagList(IBaseMetaType theMeta) {
ArrayList<TagDefinition> retVal = new ArrayList<TagDefinition>();
@ -1247,7 +1246,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
entity = myEntityManager.find(ResourceTable.class, pid);
resourceId = entity.getIdDt();
} else {
return create(theResource, null, thePerformIndexing, theRequestDetails);
return create(theResource, null, thePerformIndexing, new Date(), theRequestDetails);
}
} else {
/*

View File

@ -370,7 +370,7 @@ public class FhirSystemDaoDstu2 extends BaseHapiFhirSystemDao<Bundle, MetaDt> {
IFhirResourceDao resourceDao = myDaoRegistry.getResourceDao(res.getClass());
res.setId((String) null);
DaoMethodOutcome outcome;
outcome = resourceDao.create(res, nextReqEntry.getRequest().getIfNoneExist(), false, theRequestDetails);
outcome = resourceDao.create(res, nextReqEntry.getRequest().getIfNoneExist(), false, theUpdateTime, theRequestDetails);
handleTransactionCreateOrUpdateOutcome(theIdSubstitutions, theIdToPersistedOutcome, nextResourceId, outcome, nextRespEntry, resourceType, res);
theEntriesToProcess.put(nextRespEntry, outcome.getEntity());
if (outcome.getCreated() == false) {

View File

@ -71,9 +71,10 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
/**
* @param thePerformIndexing Use with caution! If you set this to false, you need to manually perform indexing or your resources
* won't be indexed and searches won't work.
* @param theUpdateTimestamp
* @param theRequestDetails TODO
*/
DaoMethodOutcome create(T theResource, String theIfNoneExist, boolean thePerformIndexing, RequestDetails theRequestDetails);
DaoMethodOutcome create(T theResource, String theIfNoneExist, boolean thePerformIndexing, Date theUpdateTimestamp, RequestDetails theRequestDetails);
DaoMethodOutcome create(T theResource, String theIfNoneExist, RequestDetails theRequestDetails);

View File

@ -567,7 +567,7 @@ public class TransactionProcessor<BUNDLE extends IBaseBundle, BUNDLEENTRY> {
DaoMethodOutcome outcome;
String matchUrl = myVersionAdapter.getEntryRequestIfNoneExist(nextReqEntry);
matchUrl = performIdSubstitutionsInMatchUrl(theIdSubstitutions, matchUrl);
outcome = resourceDao.create(res, matchUrl, false, theRequestDetails);
outcome = resourceDao.create(res, matchUrl, false, theUpdateTime, theRequestDetails);
if (nextResourceId != null) {
handleTransactionCreateOrUpdateOutcome(theIdSubstitutions, theIdToPersistedOutcome, nextResourceId, outcome, nextRespEntry, resourceType, res, theRequestDetails);
}

View File

@ -47,7 +47,6 @@ public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAd
static final String SUBSCRIPTION_STATUS = "Subscription.status";
static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
private static boolean ourForcePayloadEncodeAndDecodeForUnitTests;
private SubscribableChannel myProcessingChannel;
@Autowired
@ -95,9 +94,6 @@ public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAd
private void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType) {
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theNewResource, theOperationType);
if (ourForcePayloadEncodeAndDecodeForUnitTests) {
msg.clearPayloadDecoded();
}
submitResourceModified(msg);
}
@ -117,11 +113,6 @@ public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAd
sendToProcessingChannel(theMsg);
}
@VisibleForTesting
public static void setForcePayloadEncodeAndDecodeForUnitTests(boolean theForcePayloadEncodeAndDecodeForUnitTests) {
ourForcePayloadEncodeAndDecodeForUnitTests = theForcePayloadEncodeAndDecodeForUnitTests;
}
@VisibleForTesting
public LinkedBlockingQueueSubscribableChannel getProcessingChannelForUnitTest() {
return (LinkedBlockingQueueSubscribableChannel) myProcessingChannel;

View File

@ -3,7 +3,6 @@ package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.listener.SingleQueryCountHolder;
import net.ttddyy.dsproxy.listener.logging.SLF4JLogLevel;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
import org.apache.commons.dbcp2.BasicDataSource;
import org.springframework.context.annotation.Bean;
@ -97,7 +96,7 @@ public class TestR4Config extends BaseJavaConfigR4 {
DataSource dataSource = ProxyDataSourceBuilder
.create(retVal)
.logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL")
// .logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL")
// .logSlowQueryBySlf4j(10, TimeUnit.SECONDS)
// .countQuery(new ThreadQueryCountHolder())
.beforeQuery(new BlockLargeNumbersOfParamsListener())

View File

@ -36,8 +36,6 @@ import java.util.List;
public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseSubscriptionsR4Test.class);
private static int ourListenerPort;
private static RestfulServer ourListenerRestServer;
private static Server ourListenerServer;
protected static List<String> ourContentTypes = Collections.synchronizedList(new ArrayList<>());
protected static List<String> ourHeaders = Collections.synchronizedList(new ArrayList<>());
@ -54,15 +52,13 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
protected static List<Observation> ourCreatedObservations = Collections.synchronizedList(Lists.newArrayList());
protected static List<Observation> ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList());
protected static String ourListenerServerBase;
private static String ourListenerServerBase;
protected List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>());
private List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>());
@After
public void afterUnregisterRestHookListener() {
SubscriptionMatcherInterceptor.setForcePayloadEncodeAndDecodeForUnitTests(false);
for (IIdType next : mySubscriptionIds) {
IIdType nextId = next.toUnqualifiedVersionless();
ourLog.info("Deleting: {}", nextId);
@ -106,7 +102,7 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
}
protected Subscription createSubscription(String theCriteria, String thePayload) throws InterruptedException {
protected Subscription createSubscription(String theCriteria, String thePayload) {
Subscription subscription = newSubscription(theCriteria, thePayload);
MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute();
@ -144,6 +140,7 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
Observation observation = new Observation();
CodeableConcept codeableConcept = new CodeableConcept();
observation.setCode(codeableConcept);
observation.getIdentifierFirstRep().setSystem("foo").setValue("1");
Coding coding = codeableConcept.addCoding();
coding.setCode(code);
coding.setSystem(system);
@ -209,8 +206,8 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
@BeforeClass
public static void startListenerServer() throws Exception {
ourListenerPort = PortUtil.findFreePort();
ourListenerRestServer = new RestfulServer(FhirContext.forR4());
int ourListenerPort = PortUtil.findFreePort();
RestfulServer ourListenerRestServer = new RestfulServer(FhirContext.forR4());
ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context";
ObservationListener obsListener = new ObservationListener();

View File

@ -2,7 +2,6 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.jpa.config.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.subscription.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionConstants;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants;
@ -62,6 +61,122 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
}
@Test
public void testUpdatesHaveCorrectMetadata() throws Exception {
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?";
createSubscription(criteria1, payload);
waitForActivatedSubscriptionCount(1);
/*
* Send version 1
*/
Observation obs = sendObservation(code, "SNOMED-CT");
obs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless());
// Should see 1 subscription notification
waitForQueueToDrain();
int idx = 0;
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(idx));
assertEquals("1", ourUpdatedObservations.get(idx).getIdElement().getVersionIdPart());
assertEquals("1", ourUpdatedObservations.get(idx).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("1", ourUpdatedObservations.get(idx).getIdentifierFirstRep().getValue());
/*
* Send version 2
*/
obs.getIdentifierFirstRep().setSystem("foo").setValue("2");
myObservationDao.update(obs);
obs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless());
// Should see 1 subscription notification
waitForQueueToDrain();
idx++;
waitForSize(0, ourCreatedObservations);
waitForSize(2, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(idx));
assertEquals("2", ourUpdatedObservations.get(idx).getIdElement().getVersionIdPart());
assertEquals("2", ourUpdatedObservations.get(idx).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("2", ourUpdatedObservations.get(idx).getIdentifierFirstRep().getValue());
}
@Test
public void testUpdatesHaveCorrectMetadataUsingTransactions() throws Exception {
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?";
createSubscription(criteria1, payload);
waitForActivatedSubscriptionCount(1);
/*
* Send version 1
*/
Observation observation = new Observation();
observation.getIdentifierFirstRep().setSystem("foo").setValue("1");
observation.getCode().addCoding().setCode(code).setSystem("SNOMED-CT");
observation.setStatus(Observation.ObservationStatus.FINAL);
Bundle bundle = new Bundle();
bundle.setType(Bundle.BundleType.TRANSACTION);
bundle.addEntry().setResource(observation).getRequest().setMethod(Bundle.HTTPVerb.POST).setUrl("Observation");
Bundle responseBundle = mySystemDao.transaction(null, bundle);
Observation obs = myObservationDao.read(new IdType(responseBundle.getEntry().get(0).getResponse().getLocation()));
// Should see 1 subscription notification
waitForQueueToDrain();
int idx = 0;
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(idx));
assertEquals("1", ourUpdatedObservations.get(idx).getIdElement().getVersionIdPart());
assertEquals("1", ourUpdatedObservations.get(idx).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("1", ourUpdatedObservations.get(idx).getIdentifierFirstRep().getValue());
/*
* Send version 2
*/
observation = new Observation();
observation.setId(obs.getId());
observation.getIdentifierFirstRep().setSystem("foo").setValue("2");
observation.getCode().addCoding().setCode(code).setSystem("SNOMED-CT");
observation.setStatus(Observation.ObservationStatus.FINAL);
bundle = new Bundle();
bundle.setType(Bundle.BundleType.TRANSACTION);
bundle.addEntry().setResource(observation).getRequest().setMethod(Bundle.HTTPVerb.PUT).setUrl(obs.getIdElement().toUnqualifiedVersionless().getValue());
mySystemDao.transaction(null,bundle);
obs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless());
// Should see 1 subscription notification
waitForQueueToDrain();
idx++;
waitForSize(0, ourCreatedObservations);
waitForSize(2, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(idx));
assertEquals("2", ourUpdatedObservations.get(idx).getIdElement().getVersionIdPart());
assertEquals("2", ourUpdatedObservations.get(idx).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("2", ourUpdatedObservations.get(idx).getIdentifierFirstRep().getValue());
}
@Test
public void testActiveSubscriptionShouldntReActivate() throws Exception {
String criteria = "Observation?code=111111111&_format=xml";
@ -482,8 +597,6 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
@Test
public void testSubscriptionTriggerViaSubscription() throws Exception {
SubscriptionMatcherInterceptor.setForcePayloadEncodeAndDecodeForUnitTests(true);
String payload = "application/xml";
String code = "1000000050";

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.module;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -35,8 +35,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class ResourceModifiedMessage implements IResourceMessage {
private static final long serialVersionUID = 1L;
@JsonProperty("resourceId")
private String myId;
@JsonProperty("operationType")
@ -66,6 +64,7 @@ public class ResourceModifiedMessage implements IResourceMessage {
}
}
@Override
public String getPayloadId() {
return myPayloadId;
}
@ -108,20 +107,16 @@ public class ResourceModifiedMessage implements IResourceMessage {
}
}
public void setNewPayload(FhirContext theCtx, IBaseResource theNewPayload) {
private void setNewPayload(FhirContext theCtx, IBaseResource theNewPayload) {
/*
* Note: Don't set myPayloadDecoded in here- This is a false optimization since
* it doesn't actually get used if anyone is doing subscriptions at any
* scale using a queue engine, and not going through the serialize/deserialize
* as we would in a queue engine can mask bugs.
* -JA
*/
myPayload = theCtx.newJsonParser().encodeResourceToString(theNewPayload);
myPayloadId = theNewPayload.getIdElement().toUnqualified().getValue();
myPayloadDecoded = theNewPayload;
}
/**
* This is mostly useful for unit tests - Clear the decoded payload so that
* we force the encoded version to be used later. This proves that we get the same
* behaviour in environments with serializing queues as we do with in-memory
* queues.
*/
public void clearPayloadDecoded() {
myPayloadDecoded = null;
}
@ -129,7 +124,7 @@ public class ResourceModifiedMessage implements IResourceMessage {
CREATE,
UPDATE,
DELETE,
MANUALLY_TRIGGERED;
MANUALLY_TRIGGERED
}

View File

@ -227,6 +227,10 @@
_included resources failed with an SQL exception stating that too many parameters were used. Search
include logic has been reworked to avoid this.
</action>
<action type="fix">
JPA Subscription deliveries did not always include the accurate versionId if the Subscription
module was configured to use an external queuing engine. This has been corrected.
</action>
</release>
<release version="3.6.0" date="2018-11-12" description="Food">
<action type="add">