Avoid version conflicts on current writes (#1971)

* Add auto retry of transactions

* Experiment in Maven build

* Transaction autoretry

* Work on transaction retries

* Work on transaction retry

* Work on transaction retry

* Fix tests

* Avoid version conflicts

* Add changelog

* Resolve LGTM issue

* FIx transaction scope error

* Test fixes

* Test fix

* Test fixes

* Test fix

* Test fixes
This commit is contained in:
James Agnew 2020-07-13 08:51:13 -04:00 committed by GitHub
parent e565b1c948
commit baba4cc240
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 1368 additions and 375 deletions

View File

@ -25,16 +25,6 @@ jobs:
inputs:
targetType: 'inline'
script: mkdir -p $(MAVEN_CACHE_FOLDER); pwd; ls -al $(MAVEN_CACHE_FOLDER)
# - task: Maven@3
#env:
# JAVA_HOME_11_X64: /usr/local/openjdk-11
# inputs:
# goals: 'dependency:go-offline'
# # These are Maven CLI options (and show up in the build logs) - "-nsu"=Don't update snapshots. We can remove this when Maven OSS is more healthy
# options: '-P ALLMODULES,JACOCO,CI,ERRORPRONE -e -B -Dmaven.repo.local=$(MAVEN_CACHE_FOLDER)'
# # These are JVM options (and don't show up in the build logs)
# mavenOptions: '-Xmx1024m $(MAVEN_OPTS) -Dorg.slf4j.simpleLogger.showDateTime=true -Dorg.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss,SSS -Duser.timezone=America/Toronto'
# jdkVersionOption: 1.11
- task: Maven@3
env:
JAVA_HOME_11_X64: /usr/local/openjdk-11

View File

@ -854,6 +854,21 @@ public enum Pointcut {
*/
SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED(void.class, "ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription"),
/**
* <b>Subscription Hook:</b>
* Invoked immediately after an active subscription is "registered". In HAPI FHIR, when
* a subscription
* <p>
* Hooks may make changes to the canonicalized subscription and this will have an effect
* on processing across this server. Note however that timing issues may occur, since the
* subscription is already technically live by the time this hook is called.
* </p>
* No parameters are currently supported.
* <p>
* Hooks should return <code>void</code>.
* </p>
*/
SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED(void.class),
/**
* <b>Storage Hook:</b>
@ -1562,6 +1577,42 @@ public enum Pointcut {
"ca.uhn.fhir.rest.server.servlet.ServletRequestDetails"
),
/**
* <b>Storage Hook:</b>
* Invoked when a transaction has been rolled back as a result of a {@link ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException},
* meaning that a database constraint has been violated. This pointcut allows an interceptor to specify a resolution strategy
* other than simply returning the error to the client. This interceptor will be fired after the database transaction rollback
* has been completed.
* <p>
* Hooks may accept the following parameters:
* </p>
* <ul>
* <li>
* ca.uhn.fhir.rest.api.server.RequestDetails - A bean containing details about the request that is about to be processed, including details such as the
* resource type and logical ID (if any) and other FHIR-specific aspects of the request which have been
* pulled out of the servlet request. Note that the bean
* properties are not all guaranteed to be populated, depending on how early during processing the
* exception occurred. <b>Note that this parameter may be null in contexts where the request is not
* known, such as while processing searches</b>
* </li>
* <li>
* ca.uhn.fhir.rest.server.servlet.ServletRequestDetails - A bean containing details about the request that is about to be processed, including details such as the
* resource type and logical ID (if any) and other FHIR-specific aspects of the request which have been
* pulled out of the servlet request. This parameter is identical to the RequestDetails parameter above but will
* only be populated when operating in a RestfulServer implementation. It is provided as a convenience.
* </li>
* </ul>
* <p>
* Hooks should return <code>ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy</code>. Hooks should not
* throw any exception.
* </p>
*/
STORAGE_VERSION_CONFLICT(
"ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy",
"ca.uhn.fhir.rest.api.server.RequestDetails",
"ca.uhn.fhir.rest.server.servlet.ServletRequestDetails"
),
/**
* <b>EMPI Hook:</b>
* Invoked whenever a persisted Patient/Practitioner resource (a resource that has just been stored in the

View File

@ -37,6 +37,9 @@ public class ResourceVersionConflictException extends BaseServerResponseExceptio
public static final int STATUS_CODE = Constants.STATUS_HTTP_409_CONFLICT;
private static final long serialVersionUID = 1L;
/**
* Constructor
*/
public ResourceVersionConflictException(String error) {
super(STATUS_CODE, error);
}

View File

@ -13,6 +13,8 @@ import ca.uhn.fhir.context.RuntimeExtensionDtDefinition;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.model.api.ExtensionDt;
import ca.uhn.fhir.model.api.IElement;
import ca.uhn.fhir.model.api.IIdentifiableElement;
import ca.uhn.fhir.model.api.IResource;
import ca.uhn.fhir.model.api.ISupportsUndeclaredExtensions;
import ca.uhn.fhir.model.base.composite.BaseContainedDt;
@ -21,6 +23,7 @@ import ca.uhn.fhir.model.primitive.StringDt;
import ca.uhn.fhir.parser.DataFormatException;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseElement;
import org.hl7.fhir.instance.model.api.IBaseExtension;
import org.hl7.fhir.instance.model.api.IBaseHasExtensions;
import org.hl7.fhir.instance.model.api.IBaseHasModifierExtensions;
@ -128,6 +131,28 @@ public class FhirTerser {
Validate.notNull(theSource, "theSource must not be null");
Validate.notNull(theTarget, "theTarget must not be null");
// DSTU3+
if (theSource instanceof IBaseElement) {
IBaseElement source = (IBaseElement) theSource;
IBaseElement target = (IBaseElement) theTarget;
target.setId(source.getId());
}
// DSTU2 only
if (theSource instanceof IIdentifiableElement) {
IIdentifiableElement source = (IIdentifiableElement) theSource;
IIdentifiableElement target = (IIdentifiableElement) theTarget;
target.setElementSpecificId(source.getElementSpecificId());
}
// DSTU2 only
if (theSource instanceof IResource) {
IResource source = (IResource) theSource;
IResource target = (IResource) theTarget;
target.setId(source.getId());
target.getResourceMetadata().putAll(source.getResourceMetadata());
}
if (theSource instanceof IPrimitiveType<?>) {
if (theTarget instanceof IPrimitiveType<?>) {
((IPrimitiveType<?>) theTarget).setValueAsString(((IPrimitiveType<?>) theSource).getValueAsString());
@ -159,7 +184,13 @@ public class FhirTerser {
}
BaseRuntimeElementDefinition<?> element = myContext.getElementDefinition(nextValue.getClass());
IBase target = element.newInstance();
Object instanceConstructorArg = targetChild.getInstanceConstructorArguments();
IBase target;
if (instanceConstructorArg != null) {
target = element.newInstance(instanceConstructorArg);
} else {
target = element.newInstance();
}
targetChild.getMutator().addValue(theTarget, target);
cloneInto(nextValue, target, theIgnoreMissingFields);

View File

@ -134,14 +134,16 @@ public class CreatePackageCommand extends BaseCommand {
}
String[] dependencies = theCommandLine.getOptionValues(DEPENDENCY_OPT);
for (String nextDependencyString : dependencies) {
int colonIdx = nextDependencyString.indexOf(":");
if (colonIdx == -1) {
throw new ParseException("Invalid dependency spec: " + nextDependencyString);
if (dependencies != null) {
for (String nextDependencyString : dependencies) {
int colonIdx = nextDependencyString.indexOf(":");
if (colonIdx == -1) {
throw new ParseException("Invalid dependency spec: " + nextDependencyString);
}
String depName = nextDependencyString.substring(0, colonIdx);
String depVersion = nextDependencyString.substring(colonIdx + 1);
manifestGenerator.dependency(depName, depVersion);
}
String depName = nextDependencyString.substring(0, colonIdx);
String depVersion = nextDependencyString.substring(colonIdx+1);
manifestGenerator.dependency(depName, depVersion);
}
myWorkDirectory = Files.createTempDir();

View File

@ -113,6 +113,54 @@ public class CreatePackageCommandTest extends BaseTest {
}
@Test
public void testCreatePackage_NoDependencies() throws IOException {
StructureDefinition sd = new StructureDefinition();
sd.setUrl("http://foo/1");
writeFile(sd, "foo1.json");
ValueSet vs = new ValueSet();
vs.setUrl("http://foo/2");
writeFile(vs, "foo2.json");
App.main(new String[]{
"create-package",
"--fhir-version", "R4",
"--name", "com.example.ig",
"--version", "1.0.1",
"--include-package", myWorkDirectory.getAbsolutePath() + "/*.json",
"--target-directory", myTargetDirectory.getAbsolutePath()
});
Archiver archiver = ArchiverFactory.createArchiver("tar", "gz");
File igArchive = new File(myTargetDirectory, "com.example.ig-1.0.1.tgz");
archiver.extract(igArchive, myExtractDirectory);
List<String> allFiles = FileUtils.listFiles(myExtractDirectory, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE)
.stream()
.map(t -> t.getPath())
.sorted()
.collect(Collectors.toList());
ourLog.info("Archive contains files:\n * {}", allFiles.stream().collect(Collectors.joining("\n * ")));
// Verify package.json
String packageJsonContents = IOUtils.toString(new FileInputStream(new File(myExtractDirectory, "package/package.json")), Charsets.UTF_8);
ourLog.info("Package.json:\n{}", packageJsonContents);
String expectedPackageJson = "{\n" +
" \"name\": \"com.example.ig\",\n" +
" \"version\": \"1.0.1\"\n" +
"}";
assertEquals(expectedPackageJson, packageJsonContents);
// Try parsing the module again to make sure we can
NpmPackage loadedPackage = NpmPackage.fromPackage(new FileInputStream(igArchive));
assertEquals("com.example.ig", loadedPackage.name());
}
public void writeFile(IBaseResource theResource, String theFileName) throws IOException {
try (FileWriter w = new FileWriter(new File(myWorkDirectory, theFileName), false)) {
myContext.newJsonParser().encodeResourceToWriter(theResource, w);

View File

@ -53,9 +53,9 @@ public class ServerOperations {
ourLog.info("Received call with content type {} and {} bytes", contentType, bytes.length);
theServletResponse.setContentType(contentType);
theServletResponse.getOutputStream().write(bytes);
theServletResponse.getOutputStream().close();
theServletResponse.setContentType("text/plain");
theServletResponse.getWriter().write("hello");
theServletResponse.getWriter().close();
}
//END SNIPPET: manualInputAndOutput

View File

@ -0,0 +1,6 @@
---
type: add
issue: 1971
title: "A new interceptor called `UserRequestRetryVersionConflictsInterceptor` has been added to the JPA server. This interceptor
allows clients to instruct the server to attempt to avoid returning an HTTP 409 (Version Conflict) if two concurrent client
requests try to update the same resource at the same time."

View File

@ -0,0 +1,4 @@
---
type: fix
issue: 1971
title: The create-package CLI command failed with a NPE if no package dependencies were specified. This has been corrected.

View File

@ -178,3 +178,10 @@ The ResponseSizeCapturingInterceptor can be used to capture the number of charac
* [ResponseSizeCapturingInterceptor JavaDoc](/apidocs/hapi-fhir-server/ca/uhn/fhir/rest/server/interceptor/ResponseSizeCapturingInterceptor.html)
* [ResponseSizeCapturingInterceptor Source](https://github.com/jamesagnew/hapi-fhir/blob/master/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/ResponseSizeCapturingInterceptor.java)
# JPA Server: Allow Cascading Deletes
The CascadingDeleteInterceptor allows clients to request deletes be cascaded to other resources that contain incoming references. See [Cascading Deletes](/docs/server_jpa/configuration.html#cascading-deletes) for more information.
# JPA Server: Retry on Version Conflicts
The UserRequestRetryVersionConflictsInterceptor allows clients to request that the server avoid version conflicts (HTTP 409) when two concurrent client requests attempt to modify the same resource. See [Version Conflicts](/docs/server_jpa/configuration.html#retry-on-version-conflict) for more information.

View File

@ -103,10 +103,26 @@ Cache-Control: no-store, max-results=20
* [This page](https://www.openhealthhub.org/t/hapi-terminology-server-uk-snomed-ct-import/592) has information on loading national editions (UK specifically) of SNOMED CT files into the database.
<a name="cascading-deletes"/>
# Cascading Deletes
An interceptor called `CascadingDeleteInterceptor` may be registered against the Server. When this interceptor is enabled, cascading deletes may be performed using either of the following:
An interceptor called `CascadingDeleteInterceptor` may be registered against the server. When this interceptor is enabled, cascading deletes may be performed using either of the following:
* The request may include the following parameter: `_cascade=delete`
* The request may include the following header: `X-Cascade: delete`
<a name="retry-on-version-conflict"/>
# Version Conflicts
If a server is serving multiple concurrent requests against the same resource, a [ResourceVersionConflictException](/hapi-fhir/apidocs/hapi-fhir-base/ca/uhn/fhir/rest/server/exceptions/ResourceVersionConflictException.html) may be thrown (resulting in an **HTTP 409 Version Conflict** being returned to the client). For example, if two client requests attempt to update the same resource at the exact same time, this exception will be thrown for one of the requests. This exception is not a bug in the server itself, but instead is a defense against client updates accidentally being lost because of concurrency issues. When this occurs, it is important to consider what the root cause might be, since concurrent writes against the same resource are often indicative of a deeper application design issue.
An interceptor called `UserRequestRetryVersionConflictsInterceptor` may be registered against the server. When this interceptor is enabled, requests may include an optional header requesting for the server to try to avoid returning an error due to concurrent writes. The server will then try to avoid version conflict errors by automatically retrying requests that would have otherwise failed due to a version conflict.
With this interceptor in place, the following header can be added to individual HTTP requests to instruct the server to avoid version conflict errors:
```http
X-Retry-On-Version-Conflict: retry; max-retries=100
```

View File

@ -0,0 +1,29 @@
package ca.uhn.fhir.jpa.api.model;
import org.apache.commons.lang3.Validate;
/**
* @since 5.1.0
*/
public class ResourceVersionConflictResolutionStrategy {
private int myMaxRetries;
private boolean myRetry;
public int getMaxRetries() {
return myMaxRetries;
}
public void setMaxRetries(int theMaxRetries) {
Validate.isTrue(theMaxRetries >= 0, "theRetryUpToMillis must not be negative");
myMaxRetries = theMaxRetries;
}
public boolean isRetry() {
return myRetry;
}
public void setRetry(boolean theRetry) {
myRetry = theRetry;
}
}

View File

@ -163,8 +163,6 @@
</exclusions>
</dependency>
<dependency>
<groupId>net.ttddyy</groupId>
<artifactId>datasource-proxy</artifactId>
@ -734,7 +732,8 @@
<version>dstu2</version>
<configPackageBase>ca.uhn.fhir.jpa.config</configPackageBase>
<packageBase>ca.uhn.fhir.jpa.rp.dstu2</packageBase>
<targetResourceSpringBeansFile>hapi-fhir-server-resourceproviders-dstu2.xml</targetResourceSpringBeansFile>
<targetResourceSpringBeansFile>hapi-fhir-server-resourceproviders-dstu2.xml
</targetResourceSpringBeansFile>
<baseResourceNames/>
<excludeResourceNames>
<!-- <excludeResourceName>OperationDefinition</excludeResourceName> <excludeResourceName>OperationOutcome</excludeResourceName> -->
@ -750,7 +749,8 @@
<version>dstu3</version>
<configPackageBase>ca.uhn.fhir.jpa.config</configPackageBase>
<packageBase>ca.uhn.fhir.jpa.rp.dstu3</packageBase>
<targetResourceSpringBeansFile>hapi-fhir-server-resourceproviders-dstu3.xml</targetResourceSpringBeansFile>
<targetResourceSpringBeansFile>hapi-fhir-server-resourceproviders-dstu3.xml
</targetResourceSpringBeansFile>
<baseResourceNames></baseResourceNames>
<excludeResourceNames>
</excludeResourceNames>
@ -765,7 +765,8 @@
<version>r4</version>
<configPackageBase>ca.uhn.fhir.jpa.config</configPackageBase>
<packageBase>ca.uhn.fhir.jpa.rp.r4</packageBase>
<targetResourceSpringBeansFile>hapi-fhir-server-resourceproviders-r4.xml</targetResourceSpringBeansFile>
<targetResourceSpringBeansFile>hapi-fhir-server-resourceproviders-r4.xml
</targetResourceSpringBeansFile>
<baseResourceNames></baseResourceNames>
<excludeResourceNames>
</excludeResourceNames>
@ -780,7 +781,8 @@
<version>r5</version>
<configPackageBase>ca.uhn.fhir.jpa.config</configPackageBase>
<packageBase>ca.uhn.fhir.jpa.rp.r5</packageBase>
<targetResourceSpringBeansFile>hapi-fhir-server-resourceproviders-r5.xml</targetResourceSpringBeansFile>
<targetResourceSpringBeansFile>hapi-fhir-server-resourceproviders-r5.xml
</targetResourceSpringBeansFile>
<baseResourceNames></baseResourceNames>
</configuration>
</execution>

View File

@ -21,6 +21,7 @@ import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.index.DaoResourceLinkResolver;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.graphql.JpaStorageServices;
import ca.uhn.fhir.jpa.interceptor.JpaConsentContextServices;
@ -304,6 +305,11 @@ public abstract class BaseConfig {
return new PersistenceExceptionTranslationPostProcessor();
}
@Bean
public HapiTransactionService hapiTransactionService() {
return new HapiTransactionService();
}
@Bean
public IInterceptorService jpaInterceptorService() {
return new InterceptorService();

View File

@ -26,7 +26,7 @@ import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedCompositeStringUnique;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import org.hibernate.HibernateException;
import org.hibernate.StaleStateException;
import org.hibernate.PessimisticLockException;
import org.hibernate.exception.ConstraintViolationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,6 +35,7 @@ import org.springframework.orm.jpa.vendor.HibernateJpaDialect;
import javax.persistence.PersistenceException;
import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class HapiFhirHibernateJpaDialect extends HibernateJpaDialect {
@ -76,13 +77,14 @@ public class HapiFhirHibernateJpaDialect extends HibernateJpaDialect {
* will return it as lowercase even though the definition is in caps.
*/
if (isNotBlank(constraintName)) {
if (constraintName.toUpperCase().contains(ResourceHistoryTable.IDX_RESVER_ID_VER)) {
constraintName = constraintName.toUpperCase();
if (constraintName.contains(ResourceHistoryTable.IDX_RESVER_ID_VER)) {
throw new ResourceVersionConflictException(messageToPrepend + myLocalizer.getMessage(HapiFhirHibernateJpaDialect.class, "resourceVersionConstraintFailure"));
}
if (constraintName.toUpperCase().contains(ResourceIndexedCompositeStringUnique.IDX_IDXCMPSTRUNIQ_STRING)) {
if (constraintName.contains(ResourceIndexedCompositeStringUnique.IDX_IDXCMPSTRUNIQ_STRING)) {
throw new ResourceVersionConflictException(messageToPrepend + myLocalizer.getMessage(HapiFhirHibernateJpaDialect.class, "resourceIndexedCompositeStringUniqueConstraintFailure"));
}
if (constraintName.toUpperCase().contains(ForcedId.IDX_FORCEDID_TYPE_FID)) {
if (constraintName.contains(ForcedId.IDX_FORCEDID_TYPE_FID)) {
throw new ResourceVersionConflictException(messageToPrepend + myLocalizer.getMessage(HapiFhirHibernateJpaDialect.class, "forcedIdConstraintFailure"));
}
}
@ -102,10 +104,18 @@ public class HapiFhirHibernateJpaDialect extends HibernateJpaDialect {
* class in a method called "checkBatched" currently. This can all be tested using the
* StressTestR4Test method testMultiThreadedUpdateSameResourceInTransaction()
*/
if (theException instanceof StaleStateException) {
if (theException instanceof org.hibernate.StaleStateException) {
String msg = messageToPrepend + myLocalizer.getMessage(HapiFhirHibernateJpaDialect.class, "resourceVersionConstraintFailure");
throw new ResourceVersionConflictException(msg);
}
if (theException instanceof org.hibernate.PessimisticLockException) {
PessimisticLockException ex = (PessimisticLockException) theException;
String sql = defaultString(ex.getSQL()).toUpperCase();
if (sql.contains(ResourceHistoryTable.HFJ_RES_VER)) {
String msg = messageToPrepend + myLocalizer.getMessage(HapiFhirHibernateJpaDialect.class, "resourceVersionConstraintFailure");
throw new ResourceVersionConflictException(msg);
}
}
return super.convertHibernateAccessException(theException);
}

View File

@ -79,6 +79,7 @@ import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
@ -113,8 +114,10 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
import javax.persistence.EntityManager;

View File

@ -33,10 +33,9 @@ import ca.uhn.fhir.jpa.api.model.DeleteConflictList;
import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.api.model.ExpungeOutcome;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.delete.DeleteConflictService;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.patch.FhirPatch;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.entity.BaseHasResource;
import ca.uhn.fhir.jpa.model.entity.BaseTag;
import ca.uhn.fhir.jpa.model.entity.ForcedId;
@ -46,15 +45,15 @@ import ca.uhn.fhir.jpa.model.entity.TagDefinition;
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.patch.FhirPatch;
import ca.uhn.fhir.jpa.patch.JsonPatchUtils;
import ca.uhn.fhir.jpa.patch.XmlPatchUtils;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster;
import ca.uhn.fhir.jpa.patch.JsonPatchUtils;
import ca.uhn.fhir.jpa.patch.XmlPatchUtils;
import ca.uhn.fhir.model.api.IQueryParameterType;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.CacheControlDirective;
@ -70,6 +69,8 @@ import ca.uhn.fhir.rest.api.server.IPreResourceShowDetails;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SimplePreResourceAccessDetails;
import ca.uhn.fhir.rest.api.server.SimplePreResourceShowDetails;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.MethodNotAllowedException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
@ -108,6 +109,7 @@ import org.springframework.transaction.support.TransactionSynchronizationAdapter
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import javax.persistence.NoResultException;
import javax.persistence.TypedQuery;
@ -124,7 +126,6 @@ import java.util.UUID;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@Transactional(propagation = Propagation.REQUIRED)
public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends BaseHapiFhirDao<T> implements IFhirResourceDao<T> {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseHapiFhirResourceDao.class);
@ -151,8 +152,11 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
private IRequestPartitionHelperSvc myRequestPartitionHelperService;
@Autowired
private PartitionSettings myPartitionSettings;
@Autowired
private HapiTransactionService myTransactionService;
@Override
@Transactional
public void addTag(IIdType theId, TagTypeEnum theTagType, String theScheme, String theTerm, String theLabel, RequestDetails theRequest) {
StopWatch w = new StopWatch();
BaseHasResource entity = readEntity(theId, theRequest);
@ -197,8 +201,20 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
return create(theResource, theIfNoneExist, null);
}
@Override
public DaoMethodOutcome create(final T theResource, String theIfNoneExist, RequestDetails theRequestDetails) {
return create(theResource, theIfNoneExist, true, new TransactionDetails(), theRequestDetails);
}
@Override
public DaoMethodOutcome create(T theResource, String theIfNoneExist, boolean thePerformIndexing, TransactionDetails theTransactionDetails, RequestDetails theRequestDetails) {
return myTransactionService.execute(theRequestDetails, tx -> doCreateForPost(theResource, theIfNoneExist, thePerformIndexing, theTransactionDetails, theRequestDetails));
}
/**
* Called for FHIR create (POST) operations
*/
private DaoMethodOutcome doCreateForPost(T theResource, String theIfNoneExist, boolean thePerformIndexing, TransactionDetails theTransactionDetails, RequestDetails theRequestDetails) {
if (theResource == null) {
String msg = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "missingBody");
throw new InvalidRequestException(msg);
@ -220,241 +236,14 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineCreatePartitionForRequest(theRequestDetails, theResource, getResourceName());
return doCreate(theResource, theIfNoneExist, thePerformIndexing, theTransactionDetails, theRequestDetails, requestPartitionId);
}
@Override
public DaoMethodOutcome create(final T theResource, String theIfNoneExist, RequestDetails theRequestDetails) {
return create(theResource, theIfNoneExist, true, new TransactionDetails(), theRequestDetails);
}
private IInstanceValidatorModule getInstanceValidator() {
return myInstanceValidator;
}
@Override
public DaoMethodOutcome delete(IIdType theId) {
return delete(theId, null);
}
@Override
public DaoMethodOutcome delete(IIdType theId, DeleteConflictList theDeleteConflicts, RequestDetails theRequest, TransactionDetails theTransactionDetails) {
validateIdPresentForDelete(theId);
validateDeleteEnabled();
final ResourceTable entity = readEntityLatestVersion(theId, theRequest);
if (theId.hasVersionIdPart() && Long.parseLong(theId.getVersionIdPart()) != entity.getVersion()) {
throw new ResourceVersionConflictException("Trying to delete " + theId + " but this is not the current version");
}
// Don't delete again if it's already deleted
if (entity.getDeleted() != null) {
DaoMethodOutcome outcome = new DaoMethodOutcome();
outcome.setEntity(entity);
IIdType id = getContext().getVersion().newIdType();
id.setValue(entity.getIdDt().getValue());
outcome.setId(id);
IBaseOperationOutcome oo = OperationOutcomeUtil.newInstance(getContext());
String message = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "successfulDeletes", 1, 0);
String severity = "information";
String code = "informational";
OperationOutcomeUtil.addIssue(getContext(), oo, severity, message, null, code);
outcome.setOperationOutcome(oo);
return outcome;
}
StopWatch w = new StopWatch();
T resourceToDelete = toResource(myResourceType, entity, null, false);
theDeleteConflicts.setResourceIdMarkedForDeletion(theId);
// Notify IServerOperationInterceptors about pre-action call
HookParams hook = new HookParams()
.add(IBaseResource.class, resourceToDelete)
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest)
.add(TransactionDetails.class, theTransactionDetails);
doCallHooks(theRequest, Pointcut.STORAGE_PRESTORAGE_RESOURCE_DELETED, hook);
myDeleteConflictService.validateOkToDelete(theDeleteConflicts, entity, false, theRequest, theTransactionDetails);
preDelete(resourceToDelete, entity);
// Notify interceptors
if (theRequest != null) {
ActionRequestDetails requestDetails = new ActionRequestDetails(theRequest, getContext(), theId.getResourceType(), theId);
notifyInterceptors(RestOperationTypeEnum.DELETE, requestDetails);
}
ResourceTable savedEntity = updateEntityForDelete(theRequest, theTransactionDetails, entity);
resourceToDelete.setId(entity.getIdDt());
// Notify JPA interceptors
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void beforeCommit(boolean readOnly) {
HookParams hookParams = new HookParams()
.add(IBaseResource.class, resourceToDelete)
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest)
.add(TransactionDetails.class, theTransactionDetails);
doCallHooks(theRequest, Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED, hookParams);
}
});
DaoMethodOutcome outcome = toMethodOutcome(theRequest, savedEntity, resourceToDelete).setCreated(true);
IBaseOperationOutcome oo = OperationOutcomeUtil.newInstance(getContext());
String message = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "successfulDeletes", 1, w.getMillis());
String severity = "information";
String code = "informational";
OperationOutcomeUtil.addIssue(getContext(), oo, severity, message, null, code);
outcome.setOperationOutcome(oo);
return outcome;
}
@Override
public DaoMethodOutcome delete(IIdType theId, RequestDetails theRequestDetails) {
validateIdPresentForDelete(theId);
validateDeleteEnabled();
DeleteConflictList deleteConflicts = new DeleteConflictList();
if (isNotBlank(theId.getValue())) {
deleteConflicts.setResourceIdMarkedForDeletion(theId);
}
StopWatch w = new StopWatch();
DaoMethodOutcome retVal = delete(theId, deleteConflicts, theRequestDetails, new TransactionDetails());
DeleteConflictService.validateDeleteConflictsEmptyOrThrowException(getContext(), deleteConflicts);
ourLog.debug("Processed delete on {} in {}ms", theId.getValue(), w.getMillisAndRestart());
return retVal;
return doCreateForPostOrPut(theResource, theIfNoneExist, thePerformIndexing, theTransactionDetails, theRequestDetails, requestPartitionId);
}
/**
* This method gets called by {@link #deleteByUrl(String, DeleteConflictList, RequestDetails)} as well as by
* transaction processors
* Called both for FHIR create (POST) operations (via {@link #doCreateForPost(IBaseResource, String, boolean, TransactionDetails, RequestDetails)}
* as well as for FHIR update (PUT) where we're doing a create-with-client-assigned-ID (via {@link #doUpdate(IBaseResource, String, boolean, boolean, RequestDetails, TransactionDetails)}.
*/
@Override
public DeleteMethodOutcome deleteByUrl(String theUrl, DeleteConflictList deleteConflicts, RequestDetails theRequest) {
validateDeleteEnabled();
StopWatch w = new StopWatch();
Set<ResourcePersistentId> resourceIds = myMatchResourceUrlService.processMatchUrl(theUrl, myResourceType, theRequest);
if (resourceIds.size() > 1) {
if (myDaoConfig.isAllowMultipleDelete() == false) {
throw new PreconditionFailedException(getContext().getLocalizer().getMessageSanitized(BaseHapiFhirDao.class, "transactionOperationWithMultipleMatchFailure", "DELETE", theUrl, resourceIds.size()));
}
}
TransactionDetails transactionDetails = new TransactionDetails();
List<ResourceTable> deletedResources = new ArrayList<>();
for (ResourcePersistentId pid : resourceIds) {
ResourceTable entity = myEntityManager.find(ResourceTable.class, pid.getId());
deletedResources.add(entity);
T resourceToDelete = toResource(myResourceType, entity, null, false);
// Notify IServerOperationInterceptors about pre-action call
HookParams hooks = new HookParams()
.add(IBaseResource.class, resourceToDelete)
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest)
.add(TransactionDetails.class, transactionDetails);
doCallHooks(theRequest, Pointcut.STORAGE_PRESTORAGE_RESOURCE_DELETED, hooks);
myDeleteConflictService.validateOkToDelete(deleteConflicts, entity, false, theRequest, transactionDetails);
// Notify interceptors
IdDt idToDelete = entity.getIdDt();
if (theRequest != null) {
ActionRequestDetails requestDetails = new ActionRequestDetails(theRequest, idToDelete.getResourceType(), idToDelete);
notifyInterceptors(RestOperationTypeEnum.DELETE, requestDetails);
}
// Perform delete
updateEntityForDelete(theRequest, transactionDetails, entity);
resourceToDelete.setId(entity.getIdDt());
// Notify JPA interceptors
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void beforeCommit(boolean readOnly) {
HookParams hookParams = new HookParams()
.add(IBaseResource.class, resourceToDelete)
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest)
.add(TransactionDetails.class, transactionDetails);
doCallHooks(theRequest, Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED, hookParams);
}
});
}
IBaseOperationOutcome oo;
if (deletedResources.isEmpty()) {
oo = OperationOutcomeUtil.newInstance(getContext());
String message = getContext().getLocalizer().getMessageSanitized(BaseHapiFhirResourceDao.class, "unableToDeleteNotFound", theUrl);
String severity = "warning";
String code = "not-found";
OperationOutcomeUtil.addIssue(getContext(), oo, severity, message, null, code);
} else {
oo = OperationOutcomeUtil.newInstance(getContext());
String message = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "successfulDeletes", deletedResources.size(), w.getMillis());
String severity = "information";
String code = "informational";
OperationOutcomeUtil.addIssue(getContext(), oo, severity, message, null, code);
}
ourLog.debug("Processed delete on {} (matched {} resource(s)) in {}ms", theUrl, deletedResources.size(), w.getMillis());
DeleteMethodOutcome retVal = new DeleteMethodOutcome();
retVal.setDeletedEntities(deletedResources);
retVal.setOperationOutcome(oo);
return retVal;
}
@Override
public DeleteMethodOutcome deleteByUrl(String theUrl, RequestDetails theRequestDetails) {
validateDeleteEnabled();
DeleteConflictList deleteConflicts = new DeleteConflictList();
DeleteMethodOutcome outcome = deleteByUrl(theUrl, deleteConflicts, theRequestDetails);
DeleteConflictService.validateDeleteConflictsEmptyOrThrowException(getContext(), deleteConflicts);
return outcome;
}
private void validateDeleteEnabled() {
if (!myDaoConfig.isDeleteEnabled()) {
String msg = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "deleteBlockedBecauseDisabled");
throw new PreconditionFailedException(msg);
}
}
private void validateIdPresentForDelete(IIdType theId) {
if (theId == null || !theId.hasIdPart()) {
throw new InvalidRequestException("Can not perform delete, no ID provided");
}
}
@PostConstruct
public void detectSearchDaoDisabled() {
if (mySearchDao != null && mySearchDao.isDisabled()) {
mySearchDao = null;
}
}
private DaoMethodOutcome doCreate(T theResource, String theIfNoneExist, boolean thePerformIndexing, TransactionDetails theTransactionDetails, RequestDetails theRequest, RequestPartitionId theRequestPartitionId) {
private DaoMethodOutcome doCreateForPostOrPut(T theResource, String theIfNoneExist, boolean thePerformIndexing, TransactionDetails theTransactionDetails, RequestDetails theRequest, RequestPartitionId theRequestPartitionId) {
StopWatch w = new StopWatch();
preProcessResourceForStorage(theResource);
@ -579,6 +368,239 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
return outcome;
}
private IInstanceValidatorModule getInstanceValidator() {
return myInstanceValidator;
}
@Override
public DaoMethodOutcome delete(IIdType theId) {
return delete(theId, null);
}
@Override
public DaoMethodOutcome delete(IIdType theId, RequestDetails theRequestDetails) {
validateIdPresentForDelete(theId);
validateDeleteEnabled();
return myTransactionService.execute(theRequestDetails, tx -> {
DeleteConflictList deleteConflicts = new DeleteConflictList();
if (isNotBlank(theId.getValue())) {
deleteConflicts.setResourceIdMarkedForDeletion(theId);
}
StopWatch w = new StopWatch();
DaoMethodOutcome retVal = delete(theId, deleteConflicts, theRequestDetails, new TransactionDetails());
DeleteConflictService.validateDeleteConflictsEmptyOrThrowException(getContext(), deleteConflicts);
ourLog.debug("Processed delete on {} in {}ms", theId.getValue(), w.getMillisAndRestart());
return retVal;
});
}
@Override
public DaoMethodOutcome delete(IIdType theId, DeleteConflictList theDeleteConflicts, RequestDetails theRequestDetails, TransactionDetails theTransactionDetails) {
validateIdPresentForDelete(theId);
validateDeleteEnabled();
final ResourceTable entity = readEntityLatestVersion(theId, theRequestDetails);
if (theId.hasVersionIdPart() && Long.parseLong(theId.getVersionIdPart()) != entity.getVersion()) {
throw new ResourceVersionConflictException("Trying to delete " + theId + " but this is not the current version");
}
// Don't delete again if it's already deleted
if (entity.getDeleted() != null) {
DaoMethodOutcome outcome = new DaoMethodOutcome();
outcome.setEntity(entity);
IIdType id = getContext().getVersion().newIdType();
id.setValue(entity.getIdDt().getValue());
outcome.setId(id);
IBaseOperationOutcome oo = OperationOutcomeUtil.newInstance(getContext());
String message = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "successfulDeletes", 1, 0);
String severity = "information";
String code = "informational";
OperationOutcomeUtil.addIssue(getContext(), oo, severity, message, null, code);
outcome.setOperationOutcome(oo);
return outcome;
}
StopWatch w = new StopWatch();
T resourceToDelete = toResource(myResourceType, entity, null, false);
theDeleteConflicts.setResourceIdMarkedForDeletion(theId);
// Notify IServerOperationInterceptors about pre-action call
HookParams hook = new HookParams()
.add(IBaseResource.class, resourceToDelete)
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails)
.add(TransactionDetails.class, theTransactionDetails);
doCallHooks(theRequestDetails, Pointcut.STORAGE_PRESTORAGE_RESOURCE_DELETED, hook);
myDeleteConflictService.validateOkToDelete(theDeleteConflicts, entity, false, theRequestDetails, theTransactionDetails);
preDelete(resourceToDelete, entity);
// Notify interceptors
if (theRequestDetails != null) {
ActionRequestDetails requestDetails = new ActionRequestDetails(theRequestDetails, getContext(), theId.getResourceType(), theId);
notifyInterceptors(RestOperationTypeEnum.DELETE, requestDetails);
}
ResourceTable savedEntity = updateEntityForDelete(theRequestDetails, theTransactionDetails, entity);
resourceToDelete.setId(entity.getIdDt());
// Notify JPA interceptors
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void beforeCommit(boolean readOnly) {
HookParams hookParams = new HookParams()
.add(IBaseResource.class, resourceToDelete)
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails)
.add(TransactionDetails.class, theTransactionDetails);
doCallHooks(theRequestDetails, Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED, hookParams);
}
});
DaoMethodOutcome outcome = toMethodOutcome(theRequestDetails, savedEntity, resourceToDelete).setCreated(true);
IBaseOperationOutcome oo = OperationOutcomeUtil.newInstance(getContext());
String message = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "successfulDeletes", 1, w.getMillis());
String severity = "information";
String code = "informational";
OperationOutcomeUtil.addIssue(getContext(), oo, severity, message, null, code);
outcome.setOperationOutcome(oo);
return outcome;
}
@Override
public DeleteMethodOutcome deleteByUrl(String theUrl, RequestDetails theRequestDetails) {
validateDeleteEnabled();
return myTransactionService.execute(theRequestDetails, tx -> {
DeleteConflictList deleteConflicts = new DeleteConflictList();
DeleteMethodOutcome outcome = deleteByUrl(theUrl, deleteConflicts, theRequestDetails);
DeleteConflictService.validateDeleteConflictsEmptyOrThrowException(getContext(), deleteConflicts);
return outcome;
});
}
/**
* This method gets called by {@link #deleteByUrl(String, RequestDetails)} as well as by
* transaction processors
*/
@Override
public DeleteMethodOutcome deleteByUrl(String theUrl, DeleteConflictList deleteConflicts, RequestDetails theRequestDetails) {
validateDeleteEnabled();
return doDeleteByUrl(theUrl, deleteConflicts, theRequestDetails);
}
@Nonnull
private DeleteMethodOutcome doDeleteByUrl(String theUrl, DeleteConflictList deleteConflicts, RequestDetails theRequest) {
StopWatch w = new StopWatch();
Set<ResourcePersistentId> resourceIds = myMatchResourceUrlService.processMatchUrl(theUrl, myResourceType, theRequest);
if (resourceIds.size() > 1) {
if (myDaoConfig.isAllowMultipleDelete() == false) {
throw new PreconditionFailedException(getContext().getLocalizer().getMessageSanitized(BaseHapiFhirDao.class, "transactionOperationWithMultipleMatchFailure", "DELETE", theUrl, resourceIds.size()));
}
}
TransactionDetails transactionDetails = new TransactionDetails();
List<ResourceTable> deletedResources = new ArrayList<>();
for (ResourcePersistentId pid : resourceIds) {
ResourceTable entity = myEntityManager.find(ResourceTable.class, pid.getId());
deletedResources.add(entity);
T resourceToDelete = toResource(myResourceType, entity, null, false);
// Notify IServerOperationInterceptors about pre-action call
HookParams hooks = new HookParams()
.add(IBaseResource.class, resourceToDelete)
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest)
.add(TransactionDetails.class, transactionDetails);
doCallHooks(theRequest, Pointcut.STORAGE_PRESTORAGE_RESOURCE_DELETED, hooks);
myDeleteConflictService.validateOkToDelete(deleteConflicts, entity, false, theRequest, transactionDetails);
// Notify interceptors
IdDt idToDelete = entity.getIdDt();
if (theRequest != null) {
ActionRequestDetails requestDetails = new ActionRequestDetails(theRequest, idToDelete.getResourceType(), idToDelete);
notifyInterceptors(RestOperationTypeEnum.DELETE, requestDetails);
}
// Perform delete
updateEntityForDelete(theRequest, transactionDetails, entity);
resourceToDelete.setId(entity.getIdDt());
// Notify JPA interceptors
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void beforeCommit(boolean readOnly) {
HookParams hookParams = new HookParams()
.add(IBaseResource.class, resourceToDelete)
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest)
.add(TransactionDetails.class, transactionDetails);
doCallHooks(theRequest, Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED, hookParams);
}
});
}
IBaseOperationOutcome oo;
if (deletedResources.isEmpty()) {
oo = OperationOutcomeUtil.newInstance(getContext());
String message = getContext().getLocalizer().getMessageSanitized(BaseHapiFhirResourceDao.class, "unableToDeleteNotFound", theUrl);
String severity = "warning";
String code = "not-found";
OperationOutcomeUtil.addIssue(getContext(), oo, severity, message, null, code);
} else {
oo = OperationOutcomeUtil.newInstance(getContext());
String message = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "successfulDeletes", deletedResources.size(), w.getMillis());
String severity = "information";
String code = "informational";
OperationOutcomeUtil.addIssue(getContext(), oo, severity, message, null, code);
}
ourLog.debug("Processed delete on {} (matched {} resource(s)) in {}ms", theUrl, deletedResources.size(), w.getMillis());
DeleteMethodOutcome retVal = new DeleteMethodOutcome();
retVal.setDeletedEntities(deletedResources);
retVal.setOperationOutcome(oo);
return retVal;
}
private void validateDeleteEnabled() {
if (!myDaoConfig.isDeleteEnabled()) {
String msg = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "deleteBlockedBecauseDisabled");
throw new PreconditionFailedException(msg);
}
}
private void validateIdPresentForDelete(IIdType theId) {
if (theId == null || !theId.hasIdPart()) {
throw new InvalidRequestException("Can not perform delete, no ID provided");
}
}
@PostConstruct
public void detectSearchDaoDisabled() {
if (mySearchDao != null && mySearchDao.isDisabled()) {
mySearchDao = null;
}
}
private <MT extends IBaseMetaType> void doMetaAdd(MT theMetaAdd, BaseHasResource entity) {
List<TagDefinition> tags = toTagList(theMetaAdd);
@ -647,7 +669,6 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
@Override
@Transactional(propagation = Propagation.SUPPORTS)
public ExpungeOutcome forceExpungeInExistingTransaction(IIdType theId, ExpungeOptions theExpungeOptions, RequestDetails theRequest) {
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
@ -694,6 +715,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
@Override
@Transactional
public IBundleProvider history(Date theSince, Date theUntil, RequestDetails theRequestDetails) {
// Notify interceptors
ActionRequestDetails requestDetails = new ActionRequestDetails(theRequestDetails);
@ -706,10 +728,13 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
@Override
@Transactional
public IBundleProvider history(final IIdType theId, final Date theSince, Date theUntil, RequestDetails theRequest) {
// Notify interceptors
ActionRequestDetails requestDetails = new ActionRequestDetails(theRequest, getResourceName(), theId);
notifyInterceptors(RestOperationTypeEnum.HISTORY_INSTANCE, requestDetails);
if (theRequest != null) {
// Notify interceptors
ActionRequestDetails requestDetails = new ActionRequestDetails(theRequest, getResourceName(), theId);
notifyInterceptors(RestOperationTypeEnum.HISTORY_INSTANCE, requestDetails);
}
StopWatch w = new StopWatch();
@ -755,6 +780,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
@Override
@Transactional
public <MT extends IBaseMetaType> MT metaAddOperation(IIdType theResourceId, MT theMetaAdd, RequestDetails theRequest) {
// Notify interceptors
if (theRequest != null) {
@ -787,6 +813,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
@Override
@Transactional
public <MT extends IBaseMetaType> MT metaDeleteOperation(IIdType theResourceId, MT theMetaDel, RequestDetails theRequest) {
// Notify interceptors
if (theRequest != null) {
@ -821,6 +848,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
@Override
@Transactional
public <MT extends IBaseMetaType> MT metaGetOperation(Class<MT> theType, IIdType theId, RequestDetails theRequest) {
// Notify interceptors
if (theRequest != null) {
@ -842,6 +870,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
@Override
@Transactional
public <MT extends IBaseMetaType> MT metaGetOperation(Class<MT> theType, RequestDetails theRequestDetails) {
// Notify interceptors
if (theRequestDetails != null) {
@ -859,7 +888,10 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
public DaoMethodOutcome patch(IIdType theId, String theConditionalUrl, PatchTypeEnum thePatchType, String thePatchBody, IBaseParameters theFhirPatchBody, RequestDetails theRequest) {
return myTransactionService.execute(theRequest, tx -> doPatch(theId, theConditionalUrl, thePatchType, thePatchBody, theFhirPatchBody, theRequest));
}
private DaoMethodOutcome doPatch(IIdType theId, String theConditionalUrl, PatchTypeEnum thePatchType, String thePatchBody, IBaseParameters theFhirPatchBody, RequestDetails theRequest) {
ResourceTable entityToUpdate;
if (isNotBlank(theConditionalUrl)) {
@ -890,10 +922,10 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
IBaseResource destination;
switch (thePatchType) {
case JSON_PATCH:
destination = JsonPatchUtils.apply(getContext(), resourceToUpdate, thePatchBody);
destination = JsonPatchUtils.apply(getContext(), resourceToUpdate, thePatchBody);
break;
case XML_PATCH:
destination = XmlPatchUtils.apply(getContext(), resourceToUpdate, thePatchBody);
destination = XmlPatchUtils.apply(getContext(), resourceToUpdate, thePatchBody);
break;
case FHIR_PATCH_XML:
case FHIR_PATCH_JSON:
@ -932,6 +964,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
@Override
@Transactional
public T readByPid(ResourcePersistentId thePid) {
StopWatch w = new StopWatch();
@ -950,16 +983,19 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
@Override
@Transactional
public T read(IIdType theId) {
return read(theId, null);
}
@Override
@Transactional
public T read(IIdType theId, RequestDetails theRequestDetails) {
return read(theId, theRequestDetails, false);
}
@Override
@Transactional
public T read(IIdType theId, RequestDetails theRequest, boolean theDeletedOk) {
validateResourceTypeAndThrowInvalidRequestException(theId);
@ -1012,11 +1048,13 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
@Override
@Transactional
public BaseHasResource readEntity(IIdType theId, RequestDetails theRequest) {
return readEntity(theId, true, theRequest);
}
@Override
@Transactional
public BaseHasResource readEntity(IIdType theId, boolean theCheckForForcedId, RequestDetails theRequest) {
validateResourceTypeAndThrowInvalidRequestException(theId);
@ -1099,6 +1137,8 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
public void reindex(T theResource, ResourceTable theEntity) {
assert TransactionSynchronizationManager.isActualTransactionActive();
ourLog.debug("Indexing resource {} - PID {}", theEntity.getIdDt().getValue(), theEntity.getId());
if (theResource != null) {
CURRENTLY_REINDEXING.put(theResource, Boolean.TRUE);
@ -1111,11 +1151,13 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
}
@Transactional
@Override
public void removeTag(IIdType theId, TagTypeEnum theTagType, String theScheme, String theTerm) {
removeTag(theId, theTagType, theScheme, theTerm, null);
}
@Transactional
@Override
public void removeTag(IIdType theId, TagTypeEnum theTagType, String theScheme, String theTerm, RequestDetails theRequest) {
// Notify interceptors
@ -1216,26 +1258,28 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Override
public Set<ResourcePersistentId> searchForIds(SearchParameterMap theParams, RequestDetails theRequest) {
theParams.setLoadSynchronousUpTo(10000);
return myTransactionService.execute(theRequest, tx -> {
theParams.setLoadSynchronousUpTo(10000);
ISearchBuilder builder = mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType());
ISearchBuilder builder = mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType());
HashSet<ResourcePersistentId> retVal = new HashSet<>();
HashSet<ResourcePersistentId> retVal = new HashSet<>();
String uuid = UUID.randomUUID().toString();
SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequest, uuid);
String uuid = UUID.randomUUID().toString();
SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequest, uuid);
RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequest(theRequest, getResourceName());
try (IResultIterator iter = builder.createQuery(theParams, searchRuntimeDetails, theRequest, requestPartitionId)) {
while (iter.hasNext()) {
retVal.add(iter.next());
RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequest(theRequest, getResourceName());
try (IResultIterator iter = builder.createQuery(theParams, searchRuntimeDetails, theRequest, requestPartitionId)) {
while (iter.hasNext()) {
retVal.add(iter.next());
}
} catch (IOException e) {
ourLog.error("IO failure during database access", e);
}
} catch (IOException e) {
ourLog.error("IO failure during database access", e);
}
return retVal;
return retVal;
});
}
protected <MT extends IBaseMetaType> MT toMetaDt(Class<MT> theType, Collection<TagDefinition> tagDefinitions) {
@ -1303,10 +1347,21 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
String msg = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "missingBody");
throw new InvalidRequestException(msg);
}
assert theResource.getIdElement().hasIdPart() || isNotBlank(theMatchUrl);
return myTransactionService.execute(theRequest, tx -> doUpdate(theResource, theMatchUrl, thePerformIndexing, theForceUpdateVersion, theRequest, theTransactionDetails));
}
private DaoMethodOutcome doUpdate(T theResource, String theMatchUrl, boolean thePerformIndexing, boolean theForceUpdateVersion, RequestDetails theRequest, TransactionDetails theTransactionDetails) {
StopWatch w = new StopWatch();
preProcessResourceForStorage(theResource);
T resource = theResource;
if (JpaInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_VERSION_CONFLICT, myInterceptorBroadcaster, theRequest)) {
resource = (T) getContext().getResourceDefinition(theResource).newInstance();
getContext().newTerser().cloneInto(theResource, resource, false);
}
preProcessResourceForStorage(resource);
final ResourceTable entity;
@ -1321,7 +1376,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
entity = myEntityManager.find(ResourceTable.class, pid.getId());
resourceId = entity.getIdDt();
} else {
return create(theResource, null, thePerformIndexing, theTransactionDetails, theRequest);
return create(resource, null, thePerformIndexing, theTransactionDetails, theRequest);
}
} else {
/*
@ -1330,13 +1385,15 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
* BaseOutcomeReturningMethodBindingWithResourceParam
*/
resourceId = theResource.getIdElement();
assert resourceId != null;
assert resourceId.hasIdPart();
RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequest(theRequest, getResourceName());
try {
entity = readEntityLatestVersion(resourceId, requestPartitionId);
} catch (ResourceNotFoundException e) {
requestPartitionId = myRequestPartitionHelperService.determineCreatePartitionForRequest(theRequest, theResource, getResourceName());
return doCreate(theResource, null, thePerformIndexing, theTransactionDetails, theRequest, requestPartitionId);
return doCreateForPostOrPut(resource, null, thePerformIndexing, theTransactionDetails, theRequest, requestPartitionId);
}
}
@ -1372,8 +1429,8 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
* directly. So we just bail now.
*/
if (!thePerformIndexing) {
theResource.setId(entity.getIdDt().getValue());
DaoMethodOutcome outcome = toMethodOutcome(theRequest, entity, theResource).setCreated(wasDeleted);
resource.setId(entity.getIdDt().getValue());
DaoMethodOutcome outcome = toMethodOutcome(theRequest, entity, resource).setCreated(wasDeleted);
outcome.setPreviousResource(oldResource);
return outcome;
}
@ -1381,11 +1438,13 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
/*
* Otherwise, we're not in a transaction
*/
ResourceTable savedEntity = updateInternal(theRequest, theResource, thePerformIndexing, theForceUpdateVersion, entity, resourceId, oldResource, theTransactionDetails);
DaoMethodOutcome outcome = toMethodOutcome(theRequest, savedEntity, theResource).setCreated(wasDeleted);
ResourceTable savedEntity = updateInternal(theRequest, resource, thePerformIndexing, theForceUpdateVersion, entity, resourceId, oldResource, theTransactionDetails);
DaoMethodOutcome outcome = toMethodOutcome(theRequest, savedEntity, resource).setCreated(wasDeleted);
if (!thePerformIndexing) {
outcome.setId(theResource.getIdElement());
IIdType id = getContext().getVersion().newIdType();
id.setValue(entity.getIdDt().getValue());
outcome.setId(id);
}
String msg = getContext().getLocalizer().getMessageSanitized(BaseHapiFhirResourceDao.class, "successfulUpdate", outcome.getId(), w.getMillisAndRestart());

View File

@ -32,6 +32,7 @@ import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.api.model.DeleteConflict;
import ca.uhn.fhir.jpa.api.model.DeleteConflictList;
import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.delete.DeleteConflictService;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
@ -64,7 +65,6 @@ import ca.uhn.fhir.util.FhirTerser;
import ca.uhn.fhir.util.ResourceReferenceInfo;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.UrlUtil;
import com.google.common.base.Charsets;
import com.google.common.collect.ArrayListMultimap;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.dstu3.model.Bundle;
@ -83,6 +83,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
@ -121,6 +122,8 @@ public abstract class BaseTransactionProcessor {
private IInterceptorBroadcaster myInterceptorBroadcaster;
@Autowired
private MatchResourceUrlService myMatchResourceUrlService;
@Autowired
private HapiTransactionService myHapiTransactionService;
@PostConstruct
public void start() {
@ -344,9 +347,6 @@ public abstract class BaseTransactionProcessor {
final TransactionDetails transactionDetails = new TransactionDetails();
final StopWatch transactionStopWatch = new StopWatch();
final Set<IIdType> allIds = new LinkedHashSet<>();
final Map<IIdType, IIdType> idSubstitutions = new HashMap<>();
final Map<IIdType, DaoMethodOutcome> idToPersistedOutcome = new HashMap<>();
List<IBase> requestEntries = myVersionAdapter.getEntries(theRequest);
// Do all entries have a verb?
@ -403,13 +403,16 @@ public abstract class BaseTransactionProcessor {
* heavy load with lots of concurrent transactions using all available
* database connections.
*/
TransactionTemplate txManager = new TransactionTemplate(myTxManager);
Map<IBase, IBasePersistedResource> entriesToProcess = txManager.execute(status -> {
TransactionCallback<Map<IBase, IBasePersistedResource>> txCallback = status -> {
final Set<IIdType> allIds = new LinkedHashSet<>();
final Map<IIdType, IIdType> idSubstitutions = new HashMap<>();
final Map<IIdType, DaoMethodOutcome> idToPersistedOutcome = new HashMap<>();
Map<IBase, IBasePersistedResource> retVal = doTransactionWriteOperations(theRequestDetails, theActionName, transactionDetails, allIds, idSubstitutions, idToPersistedOutcome, response, originalRequestOrder, entries, transactionStopWatch);
transactionStopWatch.startTask("Commit writes to database");
return retVal;
});
};
Map<IBase, IBasePersistedResource> entriesToProcess = myHapiTransactionService.execute(theRequestDetails, txCallback);
transactionStopWatch.endCurrentTask();
for (Map.Entry<IBase, IBasePersistedResource> nextEntry : entriesToProcess.entrySet()) {

View File

@ -26,6 +26,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.api.model.DeleteConflictList;
import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.delete.DeleteConflictService;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
@ -73,7 +74,6 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@ -109,14 +109,13 @@ public class FhirSystemDaoDstu2 extends BaseHapiFhirSystemDao<Bundle, MetaDt> {
private DaoRegistry myDaoRegistry;
@Autowired
private MatchResourceUrlService myMatchResourceUrlService;
@Autowired
private HapiTransactionService myHapiTransactionalService;
private Bundle batch(final RequestDetails theRequestDetails, Bundle theRequest) {
ourLog.info("Beginning batch with {} resources", theRequest.getEntry().size());
long start = System.currentTimeMillis();
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
Bundle resp = new Bundle();
resp.setType(BundleTypeEnum.BATCH_RESPONSE);
@ -144,7 +143,7 @@ public class FhirSystemDaoDstu2 extends BaseHapiFhirSystemDao<Bundle, MetaDt> {
// create their own
nextResponseBundle = callback.doInTransaction(null);
} else {
nextResponseBundle = txTemplate.execute(callback);
nextResponseBundle = myHapiTransactionalService.execute(theRequestDetails, callback);
}
caughtEx = null;

View File

@ -90,6 +90,7 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
@ -245,6 +246,7 @@ public class SearchBuilder implements ISearchBuilder {
@Override
public Iterator<Long> createCountQuery(SearchParameterMap theParams, String theSearchUuid, RequestDetails theRequest, @Nonnull RequestPartitionId theRequestPartitionId) {
assert theRequestPartitionId != null;
assert TransactionSynchronizationManager.isActualTransactionActive();
init(theParams, theSearchUuid, theRequestPartitionId);
@ -263,6 +265,7 @@ public class SearchBuilder implements ISearchBuilder {
@Override
public IResultIterator createQuery(SearchParameterMap theParams, SearchRuntimeDetails theSearchRuntimeDetails, RequestDetails theRequest, @Nonnull RequestPartitionId theRequestPartitionId) {
assert theRequestPartitionId != null;
assert TransactionSynchronizationManager.isActualTransactionActive();
init(theParams, theSearchRuntimeDetails.getSearchUuid(), theRequestPartitionId);

View File

@ -138,6 +138,8 @@ public class IdHelperService {
*/
@Nonnull
public ResourcePersistentId resolveResourcePersistentIds(@Nonnull RequestPartitionId theRequestPartitionId, String theResourceType, String theId) {
Validate.notNull(theId, "theId must not be null");
Long retVal;
if (myDaoConfig.getResourceClientIdStrategy() == DaoConfig.ClientIdStrategyEnum.ANY || !isValidPid(theId)) {
if (myDaoConfig.isDeleteEnabled()) {

View File

@ -0,0 +1,83 @@
package ca.uhn.fhir.jpa.dao.tx;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy;
import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
public class HapiTransactionService {
private static final Logger ourLog = LoggerFactory.getLogger(HapiTransactionService.class);
@Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster;
@Autowired
private PlatformTransactionManager myTransactionManager;
private TransactionTemplate myTxTemplate;
@PostConstruct
public void start() {
myTxTemplate = new TransactionTemplate(myTransactionManager);
}
public <T> T execute(RequestDetails theRequestDetails, TransactionCallback<T> theCallback) {
for (int i = 0; ; i++) {
try {
try {
return myTxTemplate.execute(theCallback);
} catch (MyException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new InternalErrorException(e);
}
}
} catch (ResourceVersionConflictException e) {
ourLog.debug("Version conflict detected: {}", e.toString());
HookParams params = new HookParams()
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
ResourceVersionConflictResolutionStrategy conflictResolutionStrategy = (ResourceVersionConflictResolutionStrategy) JpaInterceptorBroadcaster.doCallHooksAndReturnObject(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_VERSION_CONFLICT, params);
if (conflictResolutionStrategy != null && conflictResolutionStrategy.isRetry()) {
if (i <= conflictResolutionStrategy.getMaxRetries()) {
continue;
}
ourLog.info("Max retries ({}) exceeded for version conflict", conflictResolutionStrategy.getMaxRetries());
}
throw e;
}
}
}
/**
* This is just an unchecked exception so that we can catch checked exceptions inside TransactionTemplate
* and rethrow them outside of it
*/
static class MyException extends RuntimeException {
public MyException(Throwable theThrowable) {
super(theThrowable);
}
}
}

View File

@ -0,0 +1,17 @@
package ca.uhn.fhir.jpa.dao.tx;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* @see HapiTransactionalAspect
* @since 5.1.0
*/
@Retention(RUNTIME)
@Target({METHOD, TYPE})
public @interface HapiTransactional {
}

View File

@ -44,6 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.List;

View File

@ -0,0 +1,62 @@
package ca.uhn.fhir.jpa.interceptor;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import java.util.StringTokenizer;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.trim;
/**
* This interceptor looks for a header on incoming requests called <code>X-Retry-On-Version-Conflict</code> and
* if present, it will instruct the server to automatically retry JPA server operations that would have
* otherwise failed with a {@link ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException} (HTTP 409).
* <p>
* The format of the header is:<br/>
* <code>X-Retry-On-Version-Conflict: retry; max-retries=100</code>
* </p>
*/
@Interceptor
public class UserRequestRetryVersionConflictsInterceptor {
public static final String HEADER_NAME = "X-Retry-On-Version-Conflict";
public static final String MAX_RETRIES = "max-retries";
public static final String RETRY = "retry";
@Hook(value = Pointcut.STORAGE_VERSION_CONFLICT, order = 100)
public ResourceVersionConflictResolutionStrategy check(RequestDetails theRequestDetails) {
ResourceVersionConflictResolutionStrategy retVal = new ResourceVersionConflictResolutionStrategy();
if (theRequestDetails != null) {
for (String headerValue : theRequestDetails.getHeaders(HEADER_NAME)) {
if (isNotBlank(headerValue)) {
StringTokenizer tok = new StringTokenizer(headerValue, ";");
while (tok.hasMoreTokens()) {
String next = trim(tok.nextToken());
if (next.equals(RETRY)) {
retVal.setRetry(true);
} else if (next.startsWith(MAX_RETRIES + "=")) {
String val = trim(next.substring((MAX_RETRIES + "=").length()));
int maxRetries = Integer.parseInt(val);
maxRetries = Math.min(100, maxRetries);
retVal.setMaxRetries(maxRetries);
}
}
}
}
}
return retVal;
}
}

View File

@ -35,9 +35,11 @@ import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.transaction.Transactional;
import java.util.HashSet;
import static ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster.doCallHooks;

View File

@ -7,6 +7,7 @@ import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.validation.IInstanceValidatorModule;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.listener.ThreadQueryCountHolder;
import net.ttddyy.dsproxy.listener.logging.SLF4JLogLevel;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
import org.apache.commons.dbcp2.BasicDataSource;
import org.hibernate.dialect.H2Dialect;

View File

@ -56,6 +56,7 @@ public class TestR4Config extends BaseJavaConfigR4 {
private Exception myLastStackTrace;
@Override
@Bean
public IBatchJobSubmitter batchJobSubmitter() {
return new BatchJobSubmitterImpl();

View File

@ -9,8 +9,8 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.config.BaseConfig;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.config.BaseConfig;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
@ -22,6 +22,8 @@ import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.model.dstu2.resource.Bundle;
@ -29,6 +31,7 @@ import ca.uhn.fhir.model.dstu2.resource.Bundle.Entry;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.test.BaseTest;
import ca.uhn.fhir.test.utilities.LoggingExtension;
@ -59,7 +62,6 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.test.context.TestPropertySource;
import org.springframework.transaction.PlatformTransactionManager;
@ -128,6 +130,10 @@ public abstract class BaseJpaTest extends BaseTest {
@Autowired
protected IPartitionLookupSvc myPartitionConfigSvc;
@Autowired
protected SubscriptionRegistry mySubscriptionRegistry;
@Autowired
protected SubscriptionLoader mySubscriptionLoader;
@Autowired
private IdHelperService myIdHelperService;
@Autowired
private MemoryCacheService myMemoryCacheService;
@ -419,6 +425,23 @@ public abstract class BaseJpaTest extends BaseTest {
return retVal.toArray(new String[0]);
}
protected void waitForActivatedSubscriptionCount(int theSize) throws Exception {
for (int i = 0; ; i++) {
if (i == 10) {
fail("Failed to init subscriptions");
}
try {
mySubscriptionLoader.doSyncSubscriptionsForUnitTest();
break;
} catch (ResourceVersionConflictException e) {
Thread.sleep(250);
}
}
TestUtil.waitForSize(theSize, () -> mySubscriptionRegistry.size());
Thread.sleep(500);
}
@BeforeAll
public static void beforeClassRandomizeLocale() {
randomizeLocale();

View File

@ -590,19 +590,27 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
@Test
public void testDeleteFailsIfIncomingLinks() {
String methodName = "testDeleteFailsIfIncomingLinks";
SearchParameterMap map;
List<IIdType> found;
Organization org = new Organization();
org.setName(methodName);
IIdType orgId = myOrganizationDao.create(org, mySrd).getId().toUnqualifiedVersionless();
map = SearchParameterMap.newSynchronous();
map.add("_id", new StringParam(orgId.getIdPart()));
map.addRevInclude(new Include("*"));
found = toUnqualifiedVersionlessIds(myOrganizationDao.search(map));
assertThat(found, contains(orgId));
Patient patient = new Patient();
patient.addName().addFamily(methodName);
patient.getManagingOrganization().setReference(orgId);
IIdType patId = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless();
SearchParameterMap map = new SearchParameterMap();
map.add("_id", new StringParam(orgId.getIdPart()));
map.addRevInclude(new Include("*"));
List<IIdType> found = toUnqualifiedVersionlessIds(myOrganizationDao.search(map));
found = toUnqualifiedVersionlessIds(myOrganizationDao.search(map));
assertThat(found, contains(orgId, patId));
try {
@ -613,9 +621,21 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
}
map = SearchParameterMap.newSynchronous();
map.add("_id", new StringParam(orgId.getIdPart()));
map.addRevInclude(new Include("*"));
ourLog.info("***** About to perform search");
found = toUnqualifiedVersionlessIds(myOrganizationDao.search(map));
runInTransaction(()->{
ourLog.info("Resources:\n * {}", myResourceTableDao.findAll().stream().map(t->t.toString()).collect(Collectors.joining("\n * ")));
});
assertThat(found.toString(), found, contains(orgId, patId));
myPatientDao.delete(patId, mySrd);
map = new SearchParameterMap();
map = SearchParameterMap.newSynchronous();
map.add("_id", new StringParam(orgId.getIdPart()));
map.addRevInclude(new Include("*"));
found = toUnqualifiedVersionlessIds(myOrganizationDao.search(map));
@ -623,7 +643,7 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
myOrganizationDao.delete(orgId, mySrd);
map = new SearchParameterMap();
map = SearchParameterMap.newSynchronous();
map.add("_id", new StringParam(orgId.getIdPart()));
map.addRevInclude(new Include("*"));
found = toUnqualifiedVersionlessIds(myOrganizationDao.search(map));

View File

@ -447,8 +447,6 @@ public abstract class BaseJpaR4Test extends BaseJpaTest implements ITestDataBuil
protected ITermConceptMapGroupElementTargetDao myTermConceptMapGroupElementTargetDao;
@Autowired
protected ICacheWarmingSvc myCacheWarmingSvc;
@Autowired
protected SubscriptionRegistry mySubscriptionRegistry;
protected IServerInterceptor myInterceptor;
@Autowired
protected DaoRegistry myDaoRegistry;

View File

@ -163,7 +163,7 @@ public class BaseR4SearchLastN extends BaseJpaTest {
}
private Date calculateObservationDateFromOffset(Integer theTimeOffset, Integer theObservationIndex) {
int milliSecondsPerHour = 3600 * 1000;
long milliSecondsPerHour = 3600L * 1000L;
// Generate a Date by subtracting a calculated number of hours from the static observationDate property.
return new Date(observationDate.getTimeInMillis() - (milliSecondsPerHour * (theTimeOffset + theObservationIndex)));
}

View File

@ -0,0 +1,442 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.jpa.interceptor.UserRequestRetryVersionConflictsInterceptor;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.PatchTypeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.util.HapiExtensions;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.CodeType;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.IntegerType;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.SearchParameter;
import org.hl7.fhir.r4.model.StringType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@SuppressWarnings({"unchecked", "deprecation", "Duplicates"})
public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoR4ConcurrentWriteTest.class);
private ExecutorService myExecutor;
private UserRequestRetryVersionConflictsInterceptor myRetryInterceptor;
@BeforeEach
public void before() {
myExecutor = Executors.newFixedThreadPool(10);
myRetryInterceptor = new UserRequestRetryVersionConflictsInterceptor();
RestfulServer server = new RestfulServer(myFhirCtx);
when(mySrd.getServer()).thenReturn(server);
}
@AfterEach
public void after() {
myExecutor.shutdown();
myInterceptorRegistry.unregisterInterceptor(myRetryInterceptor);
}
@Test
public void testCreateWithClientAssignedId() {
myInterceptorRegistry.registerInterceptor(myRetryInterceptor);
String value = UserRequestRetryVersionConflictsInterceptor.RETRY + "; " + UserRequestRetryVersionConflictsInterceptor.MAX_RETRIES + "=10";
when(mySrd.getHeaders(eq(UserRequestRetryVersionConflictsInterceptor.HEADER_NAME))).thenReturn(Collections.singletonList(value));
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Patient p = new Patient();
p.setId("ABC");
p.setActive(true);
p.addIdentifier().setValue("VAL" + i);
Runnable task = () -> myPatientDao.update(p, mySrd);
Future<?> future = myExecutor.submit(task);
futures.add(future);
}
// Look for failures
for (Future<?> next : futures) {
try {
next.get();
ourLog.info("Future produced success");
} catch (Exception e) {
ourLog.info("Future produced exception: {}", e.toString());
throw new AssertionError("Failed with message: " + e.toString(), e);
}
}
// Make sure we saved the object
Patient patient = myPatientDao.read(new IdType("Patient/ABC"));
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(patient));
assertEquals(true, patient.getActive());
}
@Test
public void testCreateWithUniqueConstraint() {
SearchParameter sp = new SearchParameter();
sp.setId("SearchParameter/patient-gender");
sp.setType(Enumerations.SearchParamType.TOKEN);
sp.setCode("gender");
sp.setExpression("Patient.gender");
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
sp.addBase("Patient");
mySearchParameterDao.update(sp);
sp = new SearchParameter();
sp.setId("SearchParameter/patient-gender-unique");
sp.setType(Enumerations.SearchParamType.COMPOSITE);
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
sp.addBase("Patient");
sp.addComponent()
.setExpression("Patient")
.setDefinition("SearchParameter/patient-gender");
sp.addExtension()
.setUrl(HapiExtensions.EXT_SP_UNIQUE)
.setValue(new BooleanType(true));
mySearchParameterDao.update(sp);
mySearchParamRegistry.forceRefresh();
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Patient p = new Patient();
p.setGender(Enumerations.AdministrativeGender.MALE);
p.addIdentifier().setValue("VAL" + i);
Runnable task = () -> {
try {
myPatientDao.create(p);
} catch (PreconditionFailedException e) {
// expected - This is as a result of the unique SP
assertThat(e.getMessage(), containsString("duplicate index matching query: Patient?gender=http%3A%2F%2Fhl7.org%2Ffhir%2Fadministrative-gender%7Cmale"));
} catch (ResourceVersionConflictException e) {
// expected - This is as a result of the unique SP
assertThat(e.getMessage(), containsString("would have resulted in a duplicate value for a unique index"));
}
};
Future<?> future = myExecutor.submit(task);
futures.add(future);
}
// Look for failures
for (Future<?> next : futures) {
try {
next.get();
ourLog.info("Future produced success");
} catch (Exception e) {
ourLog.info("Future produced exception: {}", e.toString());
throw new AssertionError("Failed with message: " + e.toString(), e);
}
}
runInTransaction(() -> {
ourLog.info("Uniques:\n * " + myResourceIndexedCompositeStringUniqueDao.findAll().stream().map(t -> t.toString()).collect(Collectors.joining("\n * ")));
});
// Make sure we saved the object
myCaptureQueriesListener.clear();
IBundleProvider search = myPatientDao.search(SearchParameterMap.newSynchronous("gender", new TokenParam("http://hl7.org/fhir/administrative-gender", "male")));
myCaptureQueriesListener.logSelectQueriesForCurrentThread();
assertEquals(1, search.sizeOrThrowNpe());
}
@Test
public void testDelete() {
myInterceptorRegistry.registerInterceptor(myRetryInterceptor);
String value = UserRequestRetryVersionConflictsInterceptor.RETRY + "; " + UserRequestRetryVersionConflictsInterceptor.MAX_RETRIES + "=10";
when(mySrd.getHeaders(eq(UserRequestRetryVersionConflictsInterceptor.HEADER_NAME))).thenReturn(Collections.singletonList(value));
IIdType patientId = runInTransaction(() -> {
Patient p = new Patient();
p.setActive(true);
return myPatientDao.create(p).getId().toUnqualifiedVersionless();
});
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
// Submit an update
Patient p = new Patient();
p.setId(patientId);
p.addIdentifier().setValue("VAL" + i);
Runnable task = () -> myPatientDao.update(p, mySrd);
Future<?> future = myExecutor.submit(task);
futures.add(future);
// Submit a delete
task = () -> myPatientDao.delete(patientId, mySrd);
future = myExecutor.submit(task);
futures.add(future);
}
// Look for failures
for (Future<?> next : futures) {
try {
next.get();
ourLog.info("Future produced success");
} catch (Exception e) {
ourLog.info("Future produced exception: {}", e.toString());
throw new AssertionError("Failed with message: " + e.toString(), e);
}
}
// Make sure we saved the object
IBundleProvider patient = myPatientDao.history(patientId, null, null, null);
assertThat(patient.sizeOrThrowNpe(), greaterThanOrEqualTo(3));
}
@Test
public void testNoRetryRequest() {
myInterceptorRegistry.registerInterceptor(myRetryInterceptor);
when(mySrd.getHeaders(eq(UserRequestRetryVersionConflictsInterceptor.HEADER_NAME))).thenReturn(Collections.emptyList());
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Patient p = new Patient();
p.setId("ABC");
p.setActive(true);
p.addIdentifier().setValue("VAL" + i);
Runnable task = () -> myPatientDao.update(p, mySrd);
Future<?> future = myExecutor.submit(task);
futures.add(future);
}
// Look for failures
for (Future<?> next : futures) {
try {
next.get();
ourLog.info("Future produced success");
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof ResourceVersionConflictException) {
// this is expected since we're not retrying
ourLog.info("Version conflict (expected): {}", e.getCause().toString());
} else {
ourLog.info("Future produced exception: {}", e.toString());
throw new AssertionError("Failed with message: " + e.toString(), e);
}
}
}
// Make sure we saved the object
Patient patient = myPatientDao.read(new IdType("Patient/ABC"));
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(patient));
assertEquals(true, patient.getActive());
}
@Test
public void testNoRetryInterceptor() {
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Patient p = new Patient();
p.setId("ABC");
p.setActive(true);
p.addIdentifier().setValue("VAL" + i);
Runnable task = () -> myPatientDao.update(p, mySrd);
Future<?> future = myExecutor.submit(task);
futures.add(future);
}
// Look for failures
for (Future<?> next : futures) {
try {
next.get();
ourLog.info("Future produced success");
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof ResourceVersionConflictException) {
// this is expected since we're not retrying
ourLog.info("Version conflict (expected): {}", e.getCause().toString());
} else {
ourLog.info("Future produced exception: {}", e.toString());
throw new AssertionError("Failed with message: " + e.toString(), e);
}
}
}
// Make sure we saved the object
Patient patient = myPatientDao.read(new IdType("Patient/ABC"));
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(patient));
assertEquals(true, patient.getActive());
}
@Test
public void testNoRequestDetails() {
myInterceptorRegistry.registerInterceptor(myRetryInterceptor);
when(mySrd.getHeaders(eq(UserRequestRetryVersionConflictsInterceptor.HEADER_NAME))).thenReturn(Collections.emptyList());
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Patient p = new Patient();
p.setId("ABC");
p.setActive(true);
p.addIdentifier().setValue("VAL" + i);
Runnable task = () -> myPatientDao.update(p);
Future<?> future = myExecutor.submit(task);
futures.add(future);
}
// Look for failures
for (Future<?> next : futures) {
try {
next.get();
ourLog.info("Future produced success");
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof ResourceVersionConflictException) {
// this is expected since we're not retrying
ourLog.info("Version conflict (expected): {}", e.getCause().toString());
} else {
ourLog.info("Future produced exception: {}", e.toString());
throw new AssertionError("Failed with message: " + e.toString(), e);
}
}
}
// Make sure we saved the object
Patient patient = myPatientDao.read(new IdType("Patient/ABC"));
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(patient));
assertEquals(true, patient.getActive());
}
@Test
public void testPatch() {
myInterceptorRegistry.registerInterceptor(myRetryInterceptor);
String value = UserRequestRetryVersionConflictsInterceptor.RETRY + "; " + UserRequestRetryVersionConflictsInterceptor.MAX_RETRIES + "=10";
when(mySrd.getHeaders(eq(UserRequestRetryVersionConflictsInterceptor.HEADER_NAME))).thenReturn(Collections.singletonList(value));
Patient p = new Patient();
p.addName().setFamily("FAMILY");
IIdType pId = myPatientDao.create(p).getId().toUnqualifiedVersionless();
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Parameters patch = new Parameters();
Parameters.ParametersParameterComponent operation = patch.addParameter();
operation.setName("operation");
operation
.addPart()
.setName("type")
.setValue(new CodeType("replace"));
operation
.addPart()
.setName("path")
.setValue(new StringType("Patient.name[0].family"));
operation
.addPart()
.setName("value")
.setValue(new StringType("FAMILY-" + i));
Runnable task = () -> myPatientDao.patch(pId, null, PatchTypeEnum.FHIR_PATCH_JSON, null, patch, mySrd);
Future<?> future = myExecutor.submit(task);
futures.add(future);
}
// Look for failures
for (Future<?> next : futures) {
try {
next.get();
ourLog.info("Future produced success");
} catch (Exception e) {
ourLog.info("Future produced exception: {}", e.toString());
throw new AssertionError("Failed with message: " + e.toString(), e);
}
}
// Make sure we saved the object
Patient patient = myPatientDao.read(pId);
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(patient));
assertEquals("11", patient.getMeta().getVersionId());
}
@Test
public void testTransactionWithCreate() {
myInterceptorRegistry.registerInterceptor(myRetryInterceptor);
ServletRequestDetails srd = mock(ServletRequestDetails.class);
String value = UserRequestRetryVersionConflictsInterceptor.RETRY + "; " + UserRequestRetryVersionConflictsInterceptor.MAX_RETRIES + "=10";
when(srd.getHeaders(eq(UserRequestRetryVersionConflictsInterceptor.HEADER_NAME))).thenReturn(Collections.singletonList(value));
when(srd.getUserData()).thenReturn(new HashMap<>());
when(srd.getServer()).thenReturn(new RestfulServer(myFhirCtx));
when(srd.getInterceptorBroadcaster()).thenReturn(new InterceptorService());
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Patient p = new Patient();
p.setId("ABC");
p.setActive(true);
p.addIdentifier().setValue("VAL" + i);
Bundle bundle = new Bundle();
bundle.setType(Bundle.BundleType.TRANSACTION);
bundle
.addEntry()
.setResource(p)
.getRequest()
.setMethod(Bundle.HTTPVerb.PUT)
.setUrl("Patient/ABC");
Runnable task = () -> mySystemDao.transaction(srd, bundle);
Future<?> future = myExecutor.submit(task);
futures.add(future);
}
// Look for failures
for (Future<?> next : futures) {
try {
next.get();
ourLog.info("Future produced success");
} catch (Exception e) {
ourLog.info("Future produced exception: {}", e.toString());
throw new AssertionError("Failed with message: " + e.toString(), e);
}
}
// Make sure we saved the object
Patient patient = myPatientDao.read(new IdType("Patient/ABC"));
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(patient));
assertEquals(true, patient.getActive());
}
}

View File

@ -441,7 +441,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseJpaR4Test {
Observation obs = new Observation();
obs.getSubject().setReference("Patient/P");
myObservationDao.update(obs);
myObservationDao.create(obs);
SearchParameterMap map = new SearchParameterMap();
map.setLoadSynchronous(true);
@ -482,7 +482,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseJpaR4Test {
Observation obs = new Observation();
obs.getSubject().setReference("Patient/P");
myObservationDao.update(obs);
myObservationDao.create(obs);
SearchParameterMap map = new SearchParameterMap();
map.setLoadSynchronous(true);

View File

@ -202,10 +202,6 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
return names;
}
protected void waitForActivatedSubscriptionCount(int theSize) throws Exception {
TestUtil.waitForSize(theSize, () -> mySubscriptionRegistry.size());
Thread.sleep(500);
}
@AfterAll
public static void afterClassClearContextBaseResourceProviderR4Test() throws Exception {

View File

@ -205,23 +205,6 @@ public abstract class BaseResourceProviderR5Test extends BaseJpaR5Test {
return names;
}
protected void waitForActivatedSubscriptionCount(int theSize) throws Exception {
for (int i = 0; ; i++) {
if (i == 10) {
fail("Failed to init subscriptions");
}
try {
mySubscriptionLoader.doSyncSubscriptionsForUnitTest();
break;
} catch (ResourceVersionConflictException e) {
Thread.sleep(250);
}
}
TestUtil.waitForSize(theSize, () -> mySubscriptionRegistry.size());
Thread.sleep(500);
}
@AfterAll
public static void afterClassClearContextBaseResourceProviderR5Test() throws Exception {
JettyUtil.closeServer(ourServer);

View File

@ -508,7 +508,7 @@ public class LastNElasticsearchSvcMultipleObservationsIT {
for (int entryCount = 0; entryCount < 10; entryCount++) {
ObservationJson observationJson = new ObservationJson();
String identifier = String.valueOf((entryCount + patientCount * 10));
String identifier = String.valueOf((entryCount + patientCount * 10L));
observationJson.setIdentifier(identifier);
observationJson.setSubject(subject);
@ -524,7 +524,7 @@ public class LastNElasticsearchSvcMultipleObservationsIT {
assertTrue(elasticsearchSvc.createOrUpdateObservationCodeIndex(codeableConceptId2, codeJson2));
}
Date effectiveDtm = new Date(baseObservationDate.getTimeInMillis() - ((10 - entryCount) * 3600 * 1000));
Date effectiveDtm = new Date(baseObservationDate.getTimeInMillis() - ((10L - entryCount) * 3600L * 1000L));
observationJson.setEffectiveDtm(effectiveDtm);
assertTrue(elasticsearchSvc.createOrUpdateObservationIndex(identifier, observationJson));

View File

@ -86,4 +86,7 @@ public class SubscriptionTestUtil {
subscriber.setEmailSender(myEmailSender);
}
public int getActiveSubscriptionCount() {
return mySubscriptionRegistry.size();
}
}

View File

@ -157,6 +157,7 @@ public class RestHookTestDstu2Test extends BaseResourceProviderDstu2Test {
Subscription subscription1 = createSubscription(criteria1, payload, ourListenerServerBase);
Subscription subscription2 = createSubscription(criteria2, payload, ourListenerServerBase);
waitForActivatedSubscriptionCount(2);
Observation observation1 = sendObservation(code, "SNOMED-CT");
@ -169,7 +170,6 @@ public class RestHookTestDstu2Test extends BaseResourceProviderDstu2Test {
ourLog.info("Current interceptors:\n * {}", allInterceptors);
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
@ -181,20 +181,21 @@ public class RestHookTestDstu2Test extends BaseResourceProviderDstu2Test {
ourClient.update().resource(subscriptionTemp).withId(subscriptionTemp.getIdElement()).execute();
waitForQueueToDrain();
ourLog.info("Have {} updates and {} subscriptions - sending observation", ourUpdatedObservations.size(), mySubscriptionTestUtil.getActiveSubscriptionCount());
Observation observation2 = sendObservation(code, "SNOMED-CT");
waitForQueueToDrain();
// Should see one subscription notification
waitForSize(0, ourCreatedObservations);
waitForSize(3, ourUpdatedObservations);
// Delet one subscription
// Delete one subscription
ourClient.delete().resourceById(new IdDt("Subscription/" + subscription2.getId())).execute();
waitForActivatedSubscriptionCount(1);
ourLog.info("Have {} updates and {} subscriptions - sending observation", ourUpdatedObservations.size(), mySubscriptionTestUtil.getActiveSubscriptionCount());
Observation observationTemp3 = sendObservation(code, "SNOMED-CT");
// Should see only one subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(4, ourUpdatedObservations);
@ -204,6 +205,7 @@ public class RestHookTestDstu2Test extends BaseResourceProviderDstu2Test {
CodingDt coding = codeableConcept.addCoding();
coding.setCode(code + "111");
coding.setSystem("SNOMED-CT");
ourLog.info("Have {} updates and {} subscriptions - sending observation", ourUpdatedObservations.size(), mySubscriptionTestUtil.getActiveSubscriptionCount());
ourClient.update().resource(observation3).withId(observation3.getIdElement()).execute();
// Should see no subscription notification
@ -218,6 +220,7 @@ public class RestHookTestDstu2Test extends BaseResourceProviderDstu2Test {
CodingDt coding1 = codeableConcept1.addCoding();
coding1.setCode(code);
coding1.setSystem("SNOMED-CT");
ourLog.info("Have {} updates and {} subscriptions - sending observation", ourUpdatedObservations.size(), mySubscriptionTestUtil.getActiveSubscriptionCount());
ourClient.update().resource(observation3a).withId(observation3a.getIdElement()).execute();
// Should see only one subscription notification

View File

@ -7,6 +7,7 @@ import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.HapiExtensions;
import ca.uhn.fhir.util.TestUtil;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.*;
@ -250,7 +251,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Manually unregister all subscriptions
mySubscriptionRegistry.unregisterAllSubscriptions();
waitForActivatedSubscriptionCount(0);
assertEquals(0, mySubscriptionRegistry.size());
// Force a reload
mySubscriptionLoader.doSyncSubscriptionsForUnitTest();

View File

@ -21,9 +21,7 @@ import ca.uhn.fhir.rest.annotation.Update;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.test.utilities.JettyUtil;
import ca.uhn.fhir.util.TestUtil;
import com.google.common.collect.Lists;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
@ -42,7 +40,6 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Test the rest-hook subscriptions
@ -90,23 +87,6 @@ public class RestHookTestWithInterceptorRegisteredToDaoConfigDstu2Test extends B
mySubscriptionTestUtil.waitForQueueToDrain();
}
protected void waitForActivatedSubscriptionCount(int theSize) throws Exception {
for (int i = 0; ; i++) {
if (i == 10) {
fail("Failed to init subscriptions");
}
try {
mySubscriptionLoader.doSyncSubscriptionsForUnitTest();
break;
} catch (ResourceVersionConflictException e) {
Thread.sleep(250);
}
}
TestUtil.waitForSize(theSize, () -> mySubscriptionRegistry.size());
Thread.sleep(500);
}
private Subscription createSubscription(String criteria, String payload, String endpoint) throws InterruptedException {
Subscription subscription = new Subscription();
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
@ -232,8 +212,8 @@ public class RestHookTestWithInterceptorRegisteredToDaoConfigDstu2Test extends B
Subscription subscription2 = createSubscription(criteria2, payload, ourListenerServerBase);
runInTransaction(()->{
ourLog.info("All token indexes:\n * {}", myResourceIndexedSearchParamTokenDao.findAll().stream().map(t->t.toString()).collect(Collectors.joining("\n * ")));
runInTransaction(() -> {
ourLog.info("All token indexes:\n * {}", myResourceIndexedSearchParamTokenDao.findAll().stream().map(t -> t.toString()).collect(Collectors.joining("\n * ")));
});
myCaptureQueriesListener.clear();

View File

@ -74,7 +74,11 @@ public class EmpiResourceDaoSvc {
}
public DaoMethodOutcome updatePerson(IAnyResource thePerson) {
return myPersonDao.update(thePerson);
if (thePerson.getIdElement().hasIdPart()) {
return myPersonDao.update(thePerson);
} else {
return myPersonDao.create(thePerson);
}
}
public IAnyResource readPersonByPid(ResourcePersistentId thePersonPid) {

View File

@ -49,7 +49,7 @@ import java.util.ArrayList;
import java.util.Collection;
@Entity
@Table(name = "HFJ_RES_VER", uniqueConstraints = {
@Table(name = ResourceHistoryTable.HFJ_RES_VER, uniqueConstraints = {
@UniqueConstraint(name = ResourceHistoryTable.IDX_RESVER_ID_VER, columnNames = {"RES_ID", "RES_VER"})
}, indexes = {
@Index(name = "IDX_RESVER_TYPE_DATE", columnList = "RES_TYPE,RES_UPDATED"),
@ -66,6 +66,7 @@ public class ResourceHistoryTable extends BaseHasResource implements Serializabl
// Don't reduce the visibility here, we reference this from Smile
@SuppressWarnings("WeakerAccess")
public static final int ENCODING_COL_LENGTH = 5;
public static final String HFJ_RES_VER = "HFJ_RES_VER";
private static final long serialVersionUID = 1L;
@Id

View File

@ -573,8 +573,11 @@ public class ResourceTable extends BaseHasResource implements Serializable, IBas
@Override
public String toString() {
ToStringBuilder b = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
b.append("resourceType", myResourceType);
b.append("pid", myId);
b.append("resourceType", myResourceType);
if (getDeleted() != null) {
b.append("deleted");
}
return b.build();
}

View File

@ -94,7 +94,7 @@ public class SubscriptionRegistry {
mySubscriptionChannelRegistry.add(activeSubscription);
myActiveSubscriptionCache.put(subscriptionId, activeSubscription);
ourLog.info("Registered active subscription {} - Have {} registered", subscriptionId, myActiveSubscriptionCache.size());
ourLog.info("Registered active subscription Subscription/{} - Have {} registered", subscriptionId, myActiveSubscriptionCache.size());
// Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
HookParams params = new HookParams()
@ -110,6 +110,11 @@ public class SubscriptionRegistry {
if (activeSubscription != null) {
mySubscriptionChannelRegistry.remove(activeSubscription);
ourLog.info("Unregistered active subscription {} - Have {} registered", theSubscriptionId, myActiveSubscriptionCache.size());
// Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED
HookParams params = new HookParams();
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED, params);
}
}

View File

@ -162,7 +162,7 @@ public class FhirContextDstu2Test {
});
}
// wait until all threads are ready
assertTrue(allExecutorThreadsReady.await(runnables.size() * 10, TimeUnit.MILLISECONDS), "Timeout initializing threads! Perform long lasting initializations before passing runnables to assertConcurrent");
assertTrue(allExecutorThreadsReady.await(runnables.size() * 10L, TimeUnit.MILLISECONDS), "Timeout initializing threads! Perform long lasting initializations before passing runnables to assertConcurrent");
// start all test runners
afterInitBlocker.countDown();
assertTrue(allDone.await(maxTimeoutSeconds, TimeUnit.SECONDS), message + " timeout! More than" + maxTimeoutSeconds + "seconds");

View File

@ -119,6 +119,33 @@ public class FhirTerserDstu2Test {
}
@Test
public void testCloneIntoResourceCopiesId() {
Observation obs = new Observation();
obs.setId("http://foo/base/Observation/_history/123");
obs.setValue(new StringDt("AAA"));
Observation target = new Observation();
ourCtx.newTerser().cloneInto(obs, target, false);
assertEquals("http://foo/base/Observation/_history/123", target.getId().getValue());
}
@Test
public void testCloneIntoResourceCopiesElementId() {
Observation obs = new Observation();
StringDt string = new StringDt("AAA");
string.setId("BBB");
obs.setValue(string);
Observation target = new Observation();
ourCtx.newTerser().cloneInto(obs, target, false);
assertEquals("BBB", ((StringDt)target.getValue()).getElementSpecificId());
}
/**
* See #369
*/

View File

@ -13,6 +13,7 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Element;
import org.hl7.fhir.r4.model.Enumeration;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.Extension;
@ -180,15 +181,43 @@ public class FhirTerserR4Test {
assertEquals("FOO", ((StringType) exts.get(0).getValue()).getValue());
}
@Test
public void testCloneIntoExtensionWithChildExtension() {
Patient patient = new Patient();
Extension ext = new Extension("http://example.com", new StringType("FOO"));
patient.addExtension((Extension) new Extension().setUrl("http://foo").addExtension(ext));
Patient target = new Patient();
ourCtx.newTerser().cloneInto(patient, target, false);
List<Extension> exts = target.getExtensionsByUrl("http://foo");
assertEquals(1, exts.size());
exts = exts.get(0).getExtensionsByUrl("http://example.com");
assertEquals("FOO", ((StringType) exts.get(0).getValue()).getValue());
}
@Test
public void testCloneEnumeration() {
Patient patient = new Patient();
patient.setGender(Enumerations.AdministrativeGender.MALE);
Patient target = new Patient();
ourCtx.newTerser().cloneInto(patient, target, false);
assertEquals("http://hl7.org/fhir/administrative-gender", target.getGenderElement().getSystem());
}
@Test
public void testCloneIntoPrimitive() {
StringType source = new StringType("STR");
source.setId("STRING_ID");
MarkdownType target = new MarkdownType();
ourCtx.newTerser().cloneInto(source, target, true);
assertEquals("STR", target.getValueAsString());
assertEquals("STRING_ID", target.getId());
}
@ -225,6 +254,35 @@ public class FhirTerserR4Test {
assertEquals("COMMENTS", obs.getNote().get(0).getText());
}
@Test
public void testCloneIntoResourceCopiesId() {
Observation obs = new Observation();
obs.setId("http://foo/base/Observation/_history/123");
obs.setValue(new StringType("AAA"));
obs.addNote().setText("COMMENTS");
Observation target = new Observation();
ourCtx.newTerser().cloneInto(obs, target, false);
assertEquals("http://foo/base/Observation/_history/123", target.getId());
}
@Test
public void testCloneIntoResourceCopiesElementId() {
Observation obs = new Observation();
StringType string = new StringType("AAA");
string.setId("BBB");
obs.setValue(string);
Observation target = new Observation();
ourCtx.newTerser().cloneInto(obs, target, false);
assertEquals("BBB", target.getValueStringType().getId());
}
@Test
public void testGetAllPopulatedChildElementsOfTypeDescendsIntoContained() {
Patient p = new Patient();

View File

@ -906,6 +906,11 @@
<artifactId>commons-csv</artifactId>
<version>1.7</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.5</version>
</dependency>
<dependency>
<groupId>org.hl7.fhir.testcases</groupId>
<artifactId>fhir-test-cases</artifactId>